在这篇文章中,我们将为您详细介绍使用JMS和JMSXGroupID的OracleAQ不会导致“粘性”消费者的内容。此外,我们还会涉及一些关于ActiveMQ消息队列从入门到实践(1)—JMS的概念和J
在这篇文章中,我们将为您详细介绍使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者的内容。此外,我们还会涉及一些关于ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型、ActiveMQ点对点通讯,生产者-消费者、Apache Qpid JMS 0.42.0 发布,JMS 2 客户端、Dubbo Client(消费者)端 启动 报Failed to check the status...错误的知识,以帮助您更全面地了解这个主题。
本文目录一览:- 使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者
- ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型
- ActiveMQ点对点通讯,生产者-消费者
- Apache Qpid JMS 0.42.0 发布,JMS 2 客户端
- Dubbo Client(消费者)端 启动 报Failed to check the status...错误
使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者
如何解决使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者
Apache ActiveMQ Artemis 使用 JMSXGroupId
来实现“粘性”消费者会话。使用相同 JMSXGroupId
排队的消息被发送到同一个消费者,在 FIFO 中,单线程。然而,这确实允许多个线程同时处理唯一的 JMSXGroupId
组 - 这是完美的 - 见下文:
16:46:42.451 [Thread-4] INFO Log - This is Message 30 In JMSXGroup: Group C | To Thread Thread-4
16:46:42.451 [Thread-3] INFO Log - This is Message 283 In JMSXGroup: Group B | To Thread Thread-3
16:46:42.451 [Thread-3] INFO Log - This is Message 284 In JMSXGroup: Group B | To Thread Thread-3
16:46:42.451 [Thread-4] INFO Log - This is Message 31 In JMSXGroup: Group C | To Thread Thread-4
16:46:42.452 [Thread-4] INFO Log - This is Message 32 In JMSXGroup: Group C | To Thread Thread-4
16:46:42.452 [Thread-3] INFO Log - This is Message 285 In JMSXGroup: Group B | To Thread Thread-3
Oracle AQ 和 Amazon SQS 没有表现出相同的“粘性”消费者行为。除了 JMSXGroupId
用于将相关消息组合在一起之外,我在 JMS 规范中找不到任何特定的内容。
我的预期是,当设置 JMSXGroupId
时,所有 JMS 使用者都会表现出这种“粘性”行为,但情况似乎并非如此。
是否有人仅通过设置 JMSXGroupId 就能够通过 Oracle AQ/SQS 实现这种行为?还是 JMSXGroupId
的意图是允许 consumer
在出队时使用 selector
?这似乎不像需要在运行时识别那样进行扩展,而 ActiveMQ 实现显然可以。
解决方法
JMS 规范仅规定应按顺序使用同一组中的消息。它没有说明应该如何实现这个功能。
ActiveMQ Artemis 通过将同一组中的所有消息分派给单个消费者(即您所谓的“粘性”消费者)来实现消息分组。但是,其他 JMS 提供程序可以通过其他方式自由实现此功能。
如前所述,唯一的要求是同一组中的消息按顺序消费。如果您已经在 Oracle AQ 和 Amazon SQS 上测试了此功能,并且有证据表明同一组中的消息未被按顺序使用,那么您应该联系这些提供商寻求支持。如果最终结果相同,简单地说它们的实现与 ActiveMQ Artemis 不同是无效的。
ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型
1. 面向消息的中间件
1.1 什么是 MOM
面向消息的中间件,Message Oriented Middleware,简称 MOM,中文简称消息中间件,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
一个 MOM 系统,通常会包括客户端(Clients)、消息(Message)和 MOM,客户端是发送或者接受消息的应用程序,消息封装了要传递的内容,MOM 可以存储和转发消息。如下图所示,系统 A 和系统 B 之间的消息传递,不是直接通信,而是通过中间件来间接的传递。
1.2 MOM 的好处
降低系统间通信复杂度
有了 MOM,系统间的通信,不用考虑系统是什么语言开发的,也不用考虑复杂的网络编程,各个系统只需要关心自身和 MOM 之间如何进行消息的接受和发送即可,这些操作通过简单的 API 就可以完成。
提高了消息的灵活性
系统 A 通过 MOM 向系统 B 发送消息,消息可以存储在 MOM 中,并由 MOM 转发。即使是系统 B 不在线,MOM 会持有这个消息,直到系统 B 连接并处理消息。
这就是说,系统 A 发完消息后,就可以执行其它操作,而不必阻塞等待,尤其是对那些时间无关或者并行处理的操作,非常适用。
松散耦合
有了 MOM 的存在,对于系统 B 而言,只要发送的消息没有变化,就不必考虑系统A的变化。A 系统的代码改变,不会影响到 B 系统,反之亦然。
2. JMS 概念
2.1 JMS 是什么
Java 消息服务(Java Message Service,JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。
Java 消息服务的规范包括两种消息模式,点对点和发布者/订阅者。Java 消息服务支持同步和异步的消息处理。
一个 JMS 系统,通常包括了以下部分:
JMS Client,java 语言编写的客户端,用于发送和接受消息。
Non-JMS Client,使用消息系统本地 API 编写的客户端,而不是使用 JMS API。
Message,消息,被定义,用于在不同的客户端之间交换。
JMS Provider,JMS 提供商,是消息系统(比如 ActiveMQ),当然,即可以支持 JMS,也可以同时支持 Non-JMS。
Administered Objects,被管理的对象,预置的 JMS 对象,为客户端使用。
2.3 JMS 规范
JMS 规范只是定义了接口,并没有实现,规范全文见:JSR 914: JavaTM Message Service (JMS) API 。
3. JMS 模型简介
3.1 JMS 支持两种消息通信模型:
点对点模型 (Point to Point,P2P)
发布者 / 订阅者模型(publish/subscribe, pub/sub)
P2P 模型中,Sender 把一个消息发送到 Queue 中,这个消息只能由一个客户端消费;一旦消息被消费,其它客户端就不能从这个 Queue 中获取到消息。巧克力糖盒子里只有一块糖,只有最先打开的那个人能吃到,后来的人就吃不到了。
Pub/Sub 模型中,一个消息主题(Topic)被发布以后,可以有多个订阅者收听,这些订阅者都可以获取到消息;前提是,订阅者订阅了这个主题,并且只能接受订阅以后的消息。这就像生活里的期刊订阅,我们只能收到订阅以后的期刊,之前的期刊,杂志社是不可能投递给我们的。
3.2 点对点模型
只有一个消费者:每条消息只有一个消费者,如果这条消息被消费,那么其它消费者不能接受到此消息。
时间无关性:消息的消费和时间无关,只要消息被发送了,在消息过期之前,如果没有其他消费者消费了这个消息,那么客户端可以在任何时候来消费这条消息。
消费者必须确认:消费者收到消息之后,必须向 Message Provider 确认,否则会被认为消息没有被消费,仍然可以被其他消费者消费。可以设置自动确认。这个特点其实也是保证一条消息只能由一个消费者来消费。
非持久化的消息只发一次:非持久化的消息,可能会丢失,因为消息会过期,另外 Message Provider 可能宕机。
持久化的消息严格发一次:消息可以被持久化,比如持久化在文件系统或者数据库中,这样可以避免 Message Provider 的异常或者其它异常导致消息丢失。
3.3 发布者 / 订阅者模型
每条消息可以有多个订阅者,订阅者只能消费它们订阅 topic 之后的消息。
非持久化订阅,订阅者必须保持为活动状态才能使用这些消息,如果一个订阅者 A 断开了 10 分钟,那么 A 就会收不到这 10 分钟内的消息。
持久化订阅,Message Provider 会保存这些消息,即使订阅者因为网络原因断开了,再重新连接以后,能让消费这些消息。是否使用持久化订阅,需要根据业务场景判断。
推荐阅读
Dubbo 应用服务迁移到 Kubernetes 集成方案
JDK1.7 中 HashMap 死环问题及 JDK1.8 中对 HashMap 的优化源码详解
Shiro 权限基础篇(一):Shiro 权限的基本使用方法
Shiro 应用篇(二):Shiro 结合 Redis 实现分布式环境下的 Session 共享
Spring boot 源码分析之 Spring 循环依赖揭秘
Spring 基础篇 — 常见的 Spring 异常分析及处理
Spring 高级篇 —Spring Security 入门原理及实战
什么是索引?MySQL 常见的几种索引类型和原理
微框架 Spring Boot 使用 Redis 如何实现 Session 共享
Java 面试高级篇 —Dubbo 与 Zookeeper 面试题 16 期
Java 面试高级篇 —Java NIO:浅析 I/O 模型面试题 15 期
Java 面试高级篇 —JavaIO 流原理以及 Buffered 高效原理详解
Java 面试高级篇 — 详谈 Java 四种线程池及 new Thread 的弊端面试题 14 期
消息队列篇 — 详谈 ActiveMQ 消息队列模式的分析及使用
关注微信公众号 “Java 精选”(w_z90110),回复关键字领取资料:如 Hadoop,Dubbo,CAS 源码等等,免费领取资料视频和项目。
涵盖:程序人生、搞笑视频、算法与数据结构、黑客技术与网络安全、前端开发、Java、Python、Redis 缓存、Spring 源码、各大主流框架、Web 开发、大数据技术、Storm、Hadoop、MapReduce、Spark、elasticsearch、单点登录统一认证、分布式框架、集群、安卓开发、iOS 开发、C/C++、.NET、Linux、Mysql、Oracle、NoSQL 非关系型数据库、运维等。
本文分享自微信公众号 - Java 精选(w_z90110)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与 “OSC 源创计划”,欢迎正在阅读的你也加入,一起分享。
ActiveMQ点对点通讯,生产者-消费者
首先引入pom依赖:
<!--activemq的依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.12.0</version>
</dependency>
生产者Producter:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producter {
private static final int SEND_NUMBER=5;
public static void main(String[] args) throws JMSException {
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
//ConnectionFactory:连接工厂。JMS用它创建连接
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//Connection:JMS 客户端到JMS Privider的连接
Connection connection =connectionFactory.createConnection();
connection.start();
//Session :一个发送或接收消息的线程
Session session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Destination :消息的目的地,消息发送给谁
Destination destination = session.createQueue("qushen-queue");
//消息的发送者/生产者
MessageProducer producer =session.createProducer(destination);
//设置不持久
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//发送一条消息
sendMessage(session,producer,"qushen");
connection.close();
}
public static void sendMessage(Session session,MessageProducer producer,String msg) throws JMSException {
TextMessage message=session.createTextMessage("Hello AcitiveMQ Msg:"+msg);
producer.send(message);
}
}
消费者Consumer:
package com.qushen.acitivemq.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Consumer {
public static void main(String[] args) throws JMSException {
//构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
//ConnectionFactory:连接工厂。JMS用它创建连接
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61616"
);
//Connection:JMS 客户端到JMS Privider的连接
Connection connection =connectionFactory.createConnection();
connection.start();
//Session :一个发送或接收消息的线程
Session session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Destination :消息的目的地,消息发送给谁
Destination destination = session.createQueue("qushen-queue");
//消息的接受者/消费者
MessageConsumer consumer =session.createConsumer(destination);
while(true) {
TextMessage message=(TextMessage) consumer.receive();
if(null != message ) {
System.out.println("收到消息:"+message.getText());
}else
break;
}
}
}
启动生产者之后启动消费者输出面板如下显示:
Apache Qpid JMS 0.42.0 发布,JMS 2 客户端
Qpid JMS 0.42.0 发布了。Qpid JMS 是一个基于 Qpid Proton 协议引擎构建的完整 Java Message Service 2.0 客户端。支持高级消息队列协议 AMQP 1.0。
此版本新特性与改进:
- QPIDJMS-453 - proton-j 更新到 0.33.0
- QPIDJMS-454 - 如果已经发送了Header部分,则显式填充持久字段(durable field)
更新说明:
- http://qpid.apache.org/releases/qpid-jms-0.42.0/release-notes.html
Dubbo Client(消费者)端 启动 报Failed to check the status...错误
憋了三天!上网查得其中一个是双网卡问题,当时没注意,“什么双网卡?不懂,也没这么高端的东西。”。就略过了!
但是!!!!!!把笔记本开的免费wifi工具关掉就TMD好了!!!!
蛋疼啊!!!!!!!!!!!!!!!!
xiba啊!!!!!!!!!!!!!!!!
同理,如果有学习Ehcache集群配置的同学,你懂得。
我们今天的关于使用 JMS 和 JMSXGroupID 的 Oracle AQ 不会导致“粘性”消费者的分享已经告一段落,感谢您的关注,如果您想了解更多关于ActiveMQ 消息队列从入门到实践(1)—JMS 的概念和 JMS 消息模型、ActiveMQ点对点通讯,生产者-消费者、Apache Qpid JMS 0.42.0 发布,JMS 2 客户端、Dubbo Client(消费者)端 启动 报Failed to check the status...错误的相关信息,请在本站查询。
本文标签: