GVKun编程网logo

使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者

1

在这篇文章中,我们将为您详细介绍使用JMS和JMSXGroupID的OracleAQ不会导致“粘性”消费者的内容。此外,我们还会涉及一些关于ActiveMQ消息队列从入门到实践(1)—JMS的概念和J

在这篇文章中,我们将为您详细介绍使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者的内容。此外,我们还会涉及一些关于ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型、ActiveMQ点对点通讯,生产者-消费者、Apache Qpid JMS 0.42.0 发布,JMS 2 客户端、Dubbo Client(消费者)端 启动 报Failed to check the status...错误的知识,以帮助您更全面地了解这个主题。

本文目录一览:

使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者

使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者

如何解决使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者

Apache ActiveMQ Artemis 使用 JMSXGroupId 来实现“粘性”消费者会话。使用相同 JMSXGroupId 排队的消息被发送到同一个消费者,在 FIFO 中,单线程。然而,这确实允许多个线程同时处理唯一的 JMSXGroupId 组 - 这是完美的 - 见下文:

  1. 16:46:42.451 [Thread-4] INFO Log - This is Message 30 In JMSXGroup: Group C | To Thread Thread-4
  2. 16:46:42.451 [Thread-3] INFO Log - This is Message 283 In JMSXGroup: Group B | To Thread Thread-3
  3. 16:46:42.451 [Thread-3] INFO Log - This is Message 284 In JMSXGroup: Group B | To Thread Thread-3
  4. 16:46:42.451 [Thread-4] INFO Log - This is Message 31 In JMSXGroup: Group C | To Thread Thread-4
  5. 16:46:42.452 [Thread-4] INFO Log - This is Message 32 In JMSXGroup: Group C | To Thread Thread-4
  6. 16:46:42.452 [Thread-3] INFO Log - This is Message 285 In JMSXGroup: Group B | To Thread Thread-3

Oracle AQ 和 Amazon SQS 没有表现出相同的“粘性”消费者行为。除了 JMSXGroupId 用于将相关消息组合在一起之外,我在 JMS 规范中找不到任何特定的内容。

我的预期是,当设置 JMSXGroupId 时,所有 JMS 使用者都会表现出这种“粘性”行为,但情况似乎并非如此。

是否有人仅通过设置 JMSXGroupId 就能够通过 Oracle AQ/SQS 实现这种行为?还是 JMSXGroupId 的意图是允许 consumer 在出队时使用 selector?这似乎不像需要在运行时识别那样进行扩展,而 ActiveMQ 实现显然可以。

解决方法

JMS 规范仅规定应按顺序使用同一组中的消息。它没有说明应该如何实现这个功能。

ActiveMQ Artemis 通过将同一组中的所有消息分派给单个消费者(即您所谓的“粘性”消费者)来实现消息分组。但是,其他 JMS 提供程序可以通过其他方式自由实现此功能。

如前所述,唯一的要求是同一组中的消息按顺序消费。如果您已经在 Oracle AQ 和 Amazon SQS 上测试了此功能,并且有证据表明同一组中的消息被按顺序使用,那么您应该联系这些提供商寻求支持。如果最终结果相同,简单地说它们的实现与 ActiveMQ Artemis 不同是无效的。

ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型

ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型


1. 面向消息的中间件

1.1 什么是 MOM

面向消息的中间件,Message Oriented Middleware,简称 MOM,中文简称消息中间件,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。


一个 MOM 系统,通常会包括客户端(Clients)、消息(Message)和 MOM,客户端是发送或者接受消息的应用程序,消息封装了要传递的内容,MOM 可以存储和转发消息。如下图所示,系统 A 和系统 B 之间的消息传递,不是直接通信,而是通过中间件来间接的传递。


1.2 MOM 的好处

降低系统间通信复杂度

有了 MOM,系统间的通信,不用考虑系统是什么语言开发的,也不用考虑复杂的网络编程,各个系统只需要关心自身和 MOM 之间如何进行消息的接受和发送即可,这些操作通过简单的 API 就可以完成。


提高了消息的灵活性

系统 A 通过 MOM 向系统 B 发送消息,消息可以存储在 MOM 中,并由 MOM 转发。即使是系统 B 不在线,MOM 会持有这个消息,直到系统 B 连接并处理消息。

这就是说,系统 A 发完消息后,就可以执行其它操作,而不必阻塞等待,尤其是对那些时间无关或者并行处理的操作,非常适用。


松散耦合

有了 MOM 的存在,对于系统 B 而言,只要发送的消息没有变化,就不必考虑系统A的变化。A 系统的代码改变,不会影响到 B 系统,反之亦然。


2. JMS 概念

2.1 JMS 是什么

Java 消息服务(Java Message Service,JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

Java 消息服务的规范包括两种消息模式,点对点和发布者/订阅者。Java 消息服务支持同步和异步的消息处理。


一个 JMS 系统,通常包括了以下部分:

JMS Client,java 语言编写的客户端,用于发送和接受消息。

Non-JMS Client,使用消息系统本地 API 编写的客户端,而不是使用 JMS API。

Message,消息,被定义,用于在不同的客户端之间交换。

JMS Provider,JMS 提供商,是消息系统(比如 ActiveMQ),当然,即可以支持 JMS,也可以同时支持 Non-JMS。

Administered Objects,被管理的对象,预置的 JMS 对象,为客户端使用。


2.3 JMS 规范

JMS 规范只是定义了接口,并没有实现,规范全文见:JSR 914: JavaTM Message Service (JMS) API 。


3. JMS 模型简介

3.1 JMS 支持两种消息通信模型:

点对点模型 (Point to Point,P2P)

发布者 / 订阅者模型(publish/subscribe,  pub/sub)



P2P 模型中,Sender 把一个消息发送到 Queue 中,这个消息只能由一个客户端消费;一旦消息被消费,其它客户端就不能从这个 Queue 中获取到消息。巧克力糖盒子里只有一块糖,只有最先打开的那个人能吃到,后来的人就吃不到了。


Pub/Sub 模型中,一个消息主题(Topic)被发布以后,可以有多个订阅者收听,这些订阅者都可以获取到消息;前提是,订阅者订阅了这个主题,并且只能接受订阅以后的消息。这就像生活里的期刊订阅,我们只能收到订阅以后的期刊,之前的期刊,杂志社是不可能投递给我们的。


3.2 点对点模型

只有一个消费者:每条消息只有一个消费者,如果这条消息被消费,那么其它消费者不能接受到此消息。

时间无关性:消息的消费和时间无关,只要消息被发送了,在消息过期之前,如果没有其他消费者消费了这个消息,那么客户端可以在任何时候来消费这条消息。

消费者必须确认:消费者收到消息之后,必须向 Message Provider 确认,否则会被认为消息没有被消费,仍然可以被其他消费者消费。可以设置自动确认。这个特点其实也是保证一条消息只能由一个消费者来消费。


非持久化的消息只发一次:非持久化的消息,可能会丢失,因为消息会过期,另外 Message Provider 可能宕机。

持久化的消息严格发一次:消息可以被持久化,比如持久化在文件系统或者数据库中,这样可以避免 Message Provider 的异常或者其它异常导致消息丢失。


3.3 发布者 / 订阅者模型

每条消息可以有多个订阅者,订阅者只能消费它们订阅 topic 之后的消息。


非持久化订阅,订阅者必须保持为活动状态才能使用这些消息,如果一个订阅者 A 断开了 10 分钟,那么 A 就会收不到这 10 分钟内的消息。


持久化订阅,Message Provider 会保存这些消息,即使订阅者因为网络原因断开了,再重新连接以后,能让消费这些消息。是否使用持久化订阅,需要根据业务场景判断。


推荐阅读

Dubbo 应用服务迁移到 Kubernetes 集成方案

JDK1.7 中 HashMap 死环问题及 JDK1.8 中对 HashMap 的优化源码详解

Shiro 权限基础篇(一):Shiro 权限的基本使用方法

Shiro 应用篇(二):Shiro 结合 Redis 实现分布式环境下的 Session 共享

Spring boot 源码分析之 Spring 循环依赖揭秘

Spring 基础篇 — 常见的 Spring 异常分析及处理

Spring 高级篇 —Spring Security 入门原理及实战

什么是索引?MySQL 常见的几种索引类型和原理

微框架 Spring Boot 使用 Redis 如何实现 Session 共享

Java 面试高级篇 —Dubbo 与 Zookeeper 面试题 16 期

Java 面试高级篇 —Java NIO:浅析 I/O 模型面试题 15 期

Java 面试高级篇 —JavaIO 流原理以及 Buffered 高效原理详解

Java 面试高级篇 — 详谈 Java 四种线程池及 new Thread 的弊端面试题 14 期

消息队列篇 — 详谈 ActiveMQ 消息队列模式的分析及使用

更多推荐↓↓↓
 

关注微信公众号 “Java 精选”(w_z90110),回复关键字领取资料:如 HadoopDubboCAS 源码等等,免费领取资料视频和项目。 


涵盖:程序人生、搞笑视频、算法与数据结构、黑客技术与网络安全、前端开发、Java、Python、Redis 缓存、Spring 源码、各大主流框架、Web 开发、大数据技术、Storm、Hadoop、MapReduce、Spark、elasticsearch、单点登录统一认证、分布式框架、集群、安卓开发、iOS 开发、C/C++、.NET、Linux、Mysql、Oracle、NoSQL 非关系型数据库、运维等。

本文分享自微信公众号 - Java 精选(w_z90110)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与 “OSC 源创计划”,欢迎正在阅读的你也加入,一起分享。

ActiveMQ点对点通讯,生产者-消费者

ActiveMQ点对点通讯,生产者-消费者

首先引入pom依赖:

        <!--activemq的依赖 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.12.0</version>
        </dependency>

生产者Producter: 



import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;



public class Producter {

	  private static final int SEND_NUMBER=5;

		public static void main(String[] args) throws JMSException {

	        //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
	        //ConnectionFactory:连接工厂。JMS用它创建连接
	        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
	                ActiveMQConnectionFactory.DEFAULT_USER,
	                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
	                "tcp://localhost:61616"
	        );
	        //Connection:JMS 客户端到JMS Privider的连接
	        Connection  connection =connectionFactory.createConnection();
	       
	         connection.start();
	         //Session :一个发送或接收消息的线程
	         Session session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	         //Destination :消息的目的地,消息发送给谁

	         Destination  destination = session.createQueue("qushen-queue");
	         //消息的发送者/生产者
	         MessageProducer  producer =session.createProducer(destination);
	         //设置不持久
	         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
	          //发送一条消息
	         sendMessage(session,producer,"qushen");
	         connection.close();
	    }
	 
	    public static void sendMessage(Session session,MessageProducer producer,String msg) throws JMSException {
	 
	            TextMessage message=session.createTextMessage("Hello AcitiveMQ  Msg:"+msg);

	            producer.send(message);
	
	}
}

消费者Consumer:

package com.qushen.acitivemq.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

	public static void main(String[] args) throws JMSException {

	      //构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        //ConnectionFactory:连接工厂。JMS用它创建连接
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
        );
        //Connection:JMS 客户端到JMS Privider的连接
        Connection  connection =connectionFactory.createConnection();
       
         connection.start();
         //Session :一个发送或接收消息的线程
         Session session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         //Destination :消息的目的地,消息发送给谁

         Destination  destination = session.createQueue("qushen-queue");
         //消息的接受者/消费者
         MessageConsumer  consumer =session.createConsumer(destination);
           while(true) {
        	   TextMessage message=(TextMessage) consumer.receive();
        	   if(null != message ) {
        		   System.out.println("收到消息:"+message.getText());
        	
        	   }else
        		   break;
           }
    }
 

}

 启动生产者之后启动消费者输出面板如下显示:

 

Apache Qpid JMS 0.42.0 发布,JMS 2 客户端

Apache Qpid JMS 0.42.0 发布,JMS 2 客户端

Qpid JMS 0.42.0 发布了。Qpid JMS 是一个基于 Qpid Proton 协议引擎构建的完整 Java Message Service 2.0 客户端。支持高级消息队列协议 AMQP 1.0。

此版本新特性与改进:

  • QPIDJMS-453 - proton-j 更新到 0.33.0
  • QPIDJMS-454 - 如果已经发送了Header部分,则显式填充持久字段(durable field)

更新说明:

  • http://qpid.apache.org/releases/qpid-jms-0.42.0/release-notes.html

Dubbo Client(消费者)端 启动 报Failed to check the status...错误

Dubbo Client(消费者)端 启动 报Failed to check the status...错误

憋了三天!上网查得其中一个是双网卡问题,当时没注意,“什么双网卡?不懂,也没这么高端的东西。”。就略过了!

但是!!!!!!把笔记本开的免费wifi工具关掉就TMD好了!!!!

蛋疼啊!!!!!!!!!!!!!!!!

xiba啊!!!!!!!!!!!!!!!!

同理,如果有学习Ehcache集群配置的同学,你懂得。

我们今天的关于使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者的分享已经告一段落,感谢您的关注,如果您想了解更多关于ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型、ActiveMQ点对点通讯,生产者-消费者、Apache Qpid JMS 0.42.0 发布,JMS 2 客户端、Dubbo Client(消费者)端 启动 报Failed to check the status...错误的相关信息,请在本站查询。

本文标签:

上一篇Oracle regexp_like 得到错误的结果(oracle regexp_like用法)

下一篇如何对 Oracle 中不包括字符串值的列中的所有值求和?(oracle 不包含字符串)