如果您对避免在JMS/ActiveMQ上重复消息感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于避免在JMS/ActiveMQ上重复消息的详细内容,我们还将为您解答activ
如果您对避免在JMS / ActiveMQ上重复消息感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于避免在JMS / ActiveMQ上重复消息的详细内容,我们还将为您解答activemq如何保证消息不重复的相关问题,并且为您提供关于ActiveMQ (二):JMS、ActiveMQ - JMS 与 ActiveMQ 介绍、ActiveMQ 5.10.0 发布,JMS 消息服务器、ActiveMQ 5.12.0 发布,JMS 消息服务器的有价值信息。
本文目录一览:- 避免在JMS / ActiveMQ上重复消息(activemq如何保证消息不重复)
- ActiveMQ (二):JMS
- ActiveMQ - JMS 与 ActiveMQ 介绍
- ActiveMQ 5.10.0 发布,JMS 消息服务器
- ActiveMQ 5.12.0 发布,JMS 消息服务器
避免在JMS / ActiveMQ上重复消息(activemq如何保证消息不重复)
有没有一种方法可以抑制ActiveMQ服务器上定义的队列上的重复消息?
我尝试手动定义JMSMessageID((message.setJMSMessageID(“
uniqueid”)),但是服务器忽略此修改并使用内置的JMSMessageID传递消息。
根据规范,我没有找到有关如何删除邮件重复数据的参考。
在HornetQ中,要解决此问题,我们需要在消息定义中声明HQ特定的属性org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。
即:
Message jmsMessage = session.createMessage();String myUniqueID = "This is my unique id"; // Could use a UUID for thismessage.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);
有人知道ActiveMQ是否有类似的解决方案?
答案1
小编典典您应该看看Apache
Camel,它提供了一个与任何JMS提供程序一起使用的幂等消费者组件,请参阅:http : //camel.apache.org/idempotent-
consumer.html
结合使用ActiveMQ组件可以使使用JMS非常简单,请参见:http :
//camel.apache.org/activemq.html
ActiveMQ (二):JMS
1.前言
由于ActiveMQ是一种完全符合JMS规范的一种通信工具,所以在使用ActiveMQ前认识JMS规范就变的十分必要了。
认识JMS主要从以下方面:
a. JMS 模型
b. JMS 对象模型
c. JMS 传递方式
d. JMS 消息类型<消息正文格式>
2. 模型
Java消息服务应用程序结构支持两种模型:
点对点或队列模型
这种模型的特殊之处在于:生产者不会指定唯一的消费者消费消息,而是多个消费者消费一个消息,即多个消费者都可以消费这条消息,且只有一个消费者可以消费到。
发布者/订阅者模型
这种模型是使用发布/订阅者模式,订阅者保存Topic地址,从地址中“拉取”消息数据。
3. 对象模型
ActiveMQ 依赖下面的模型进行消息发送接收。
模型图如下:
JMS对象模型包含如下几个要素:
1)连接工厂。
连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
2)JMS连接。
JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
3)JMS会话。
JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
4)JMS目的。
JMS目的(Destination),又称为消息队列,是实际的消息源。
5)JMS生产者和消费者。
生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
6)JMS消息通常有两种类型:
① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。
② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
4.传递方式
JMS有两种传递消息的方式:
NON_PERSISTENT(非持久化):消息最多投递一次。若消费者服务未启动,则在消息发送后再启动消费者服务,消费者无法获取到之前发送的消息。
PERSISTENT(持久化):使用暂存后再转送的机理投递。消费者在消息发送后启动服务,可以接收到之前发送的消息。
5. JMS消息类型
MS定义了五种不同的消息正文格式,以及调用的消息类型:
· StreamMessage -- Java原始值的数据流
· MapMessage--一套名称-值对
· TextMessage--一个字符串对象
· ObjectMessage--一个序列化的 Java对象
· BytesMessage--一个未解释字节的数据流
参考资料:
1. http://baike.baidu.com/item/JMS/2836691?sefr=enterbtn#5
2. http://shmilyaw-hotmail-com.iteye.com/blog/1897635
ActiveMQ - JMS 与 ActiveMQ 介绍
本文大部分转载于 PPT(JMS 中间件 ActiveMQ 介绍.pptx)
JMS 介绍
JMS 源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun 公司和它的合作伙伴设计的 JMS API 定义了一组公共的应用程序接口和相应语法,使得 Java 程序能够和其他消息组件进行通信。JMS 有四个组成部分:JMS 服务提供者、消息管理对象、消息的生产者消费者和消息本身。
(1)、JMS 服务提供者实现消息队列和通知,同时实现消息管理的 API。JMS 已经是 J2EE API 的一部分,J2EE 服务器都提供 JMS 服务。
(2)、消息管理对象提供对消息进行操作的 API。JMS API 中有两个消息管理对象:创建 jms 连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同 ConnectionFactory 可以分为 QueueConnectionFactory 和 TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。
(3)、消息的生产者和消费者。消息的产生由 JMS 的客户端完成,JMS 服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber) 和点对点消息的接收者(queue receiver)
(4)、消息。消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS 消息由以下三部分组成:
-
消息头(header)―― JMS 消息头包含了许多字段,它们是消息发送后由 JMS 提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。
-
属性(property)―― 用来添加删除消息头以外的附加信息。
-
消息体(body)――JMS 中定义了 5 种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage 和 TextMessage。
JMS 模型
Java 消息服务应用程序结构支持两种模型: 点对点模型 (基于队列) 和 发布者 / 订阅者模型(基于主题的)。
(1)点对点模型 (基于队列)
每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的相关性。可以由多个发送者,但只能被一个消费者消费。它具有如下特点:
- 一个消息只能被一个接受者接受一次
- 生产者把消息发送到队列中 (Queue),这个队列可以理解为电视机频道 (channel) ,在这个消息中间件上有多个这样的 channel
- 接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态
(2) 发布者 / 订阅者模型(基于主题的)
每个消息可以有多个消费者。生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。
- 允许多个接受者,类似于广播的方式
- 生产者将消息发送到主题上 (Topic)
- 接受者必须先订阅
JMS 消息发送时序图
消费者的消费方式分为同步和异步两种:
- 同步方式。通过调用消费者的 receive 方法从目的地中显式提取消息。receive 方法可以一直阻塞到消息到达。
- 异步方式。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。实现 MessageListener 接口,在 MessageListener()方法中实现消息的处理逻辑。
ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
ActiveMQ 的通信机制
activeMQ 支持多种通讯协议 TCP/UDP 等,选取最常用的 TCP 来分析 activeMQ 的通讯机制。首先明确概念:
客户 (Client):消息的生产者、消费者对 activeMQ 来说都叫作客户。
消息中转器 (Message broker):它是 activeMQ 的核心,它接收信息并进行相关处理后分发给消息消费者。 为了能清楚的描述出 activeMQ 的核心通讯机制,我们选择 3 个部分来进行说明,它们分别是建立链接、关闭链接、心跳。
** Client 跟 activeMQ 的 TCP 通讯的初始化过程分析 **
1. activeMQ 初始化时,通过 TcpTransportServer 类根据配置打开 TCP 侦听端口,客户通过该端口发起建立链接的动作。
2. 把 accept 的 Socket 放入阻塞队列中。
3. 另外一个线程 Socket handler 阻塞着等待队列中是否有新的 Socket,如果有则取出来。
4. 生成一个 TransportConnection 的实例。TransportConnection 类的主要作用是处理链路的状态信息,并实现 CommandVisitor 接口来完成各类消息的处理。
5. TransportConnection 会使用一个由多个 TransportFilter 实例组成的消息处理链条,负责对接收到的各类消息进行处理并发送相应的应答。这个链条的典型组成顺序:MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在这条链条中最后的一环就是 TcpTransport 类,它是实际和 Client 获取和发送数据的地方。
6. 建链完成,可以进行通讯操作。方法有 run () 和 oneway (),一个负责读取,一个负责发送。
** 关闭链接 **
activeMQ 发现 TCP 链接的关闭,最关键的代码在 TcpBufferedInputStream 类中的 int n = in.read (buffer, position, buffer.length - position);
** 心跳 **
为了更好的维护 TCP 链路的使用,activeMQ 采用了心跳机制作为判断双方链路的健康情况。activeMQ 使用的是双向心跳,也就是 activeMQ 的 Broker 和 Client 双方都进行相互心跳,但不管是 Broker 或 Client 心跳的具体处理情况是完全一样的,都在 InactivityMonitor 类中实现,下面具体介绍。
心跳会产生两个线程 “InactivityMonitor ReadCheck” 和 “InactivityMonitor WriteCheck”,它们都是 Timer 类型,都会隔一段固定时间被调用一次。ReadCheck 线程主要调用的方法是 readCheck (),当在等待时间内,有消息接收到,则该方法会返回 true。WriteCheck 线程主要调用的方法是 writeCheck (),这有个小技巧,大家可以参考一下,那就是当 WriteCheck 线程休眠时,有任何数据发送成功,则该线程被唤醒后,不用通过 TCP 向对方真的发送心跳消息,这样可以从一定程度上减少网络传输的数据量
ActiveMQ 5.10.0 发布,JMS 消息服务器
ActiveMQ 5.10.0 发布,此版本修复了超过 200 个问题,引入了大量的改进,特别是 MQTT 和 AMQP 支持方面的。更新内容如下:
Java 8 支持
Apache Shiro 安全插件 - http://activemq.apache.org/shiro.html
加强 MQTT 支持
加强 AMQP 支持
加强 LevelDB store
改进 RAR/JCA 适配器
改进 Runtime 配置插件
改进 Web 工作台
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:
1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通过JDBC和journal提供高速的消息持久化
7. 从设计上保证了高性能的集群,客户端-服务器,点对点
8. 支持Ajax
9. 支持与Axis的整合
10. 可以很容易得调用内嵌JMS provider,进行测试
ActiveMQ 5.12.0 发布,JMS 消息服务器
ActiveMQ 5.12.0 发布,此版本解决了一些 issues,改进了 AMQP 和 MQTT 支持,修复了 STOMP 和 MQTT 相关的问题。
下载:
apache-activemq-5.12.0-bin.zip
apache-activemq-5.12.0-bin.tar.gz
改进列表:
Supports a variety of Cross Language Clients and Protocols from Java, C, C++, C#, Ruby, Perl, Python, PHP
OpenWire for high performance clients in Java, C, C++, C#
Stomp support so that clients can be written easily in C, Ruby, Perl, Python, PHP, ActionScript/Flash, Smalltalk to talk to ActiveMQ as well as any other popular Message Broker
AMQP v1.0 support
MQTT v3.1 support allowing for connections in an IoT environment.
full support for the Enterprise Integration Patterns both in the JMS client and the Message Broker
Supports many advanced features such as Message Groups, Virtual Destinations, Wildcards and Composite Destinations
Fully supports JMS 1.1 and J2EE 1.4 with support for transient, persistent, transactional and XA messaging
Spring Support so that ActiveMQ can be easily embedded into Spring applications and configured using Spring''s XML configuration mechanism
Tested inside popular J2EE servers such as TomEE, Geronimo, JBoss, GlassFish and WebLogic
Includes JCA 1.5 resource adaptors for inbound & outbound messaging so that ActiveMQ should auto-deploy in any J2EE 1.4 compliant server
Supports pluggable transport protocols such as in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports
Supports very fast persistence using JDBC along with a high performance journal
Designed for high performance clustering, client-server, peer based communication
REST API to provide technology agnostic and language neutral web based API to messaging
Ajax to support web streaming support to web browsers using pure DHTML, allowing web browsers to be part of the messaging fabric
CXF and Axis Support so that ActiveMQ can be easily dropped into either of these web service stacks to provide reliable messaging
Can be used as an in memory JMS provider, ideal for unit testing JMS
更多内容请看发行说明。
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
我们今天的关于避免在JMS / ActiveMQ上重复消息和activemq如何保证消息不重复的分享已经告一段落,感谢您的关注,如果您想了解更多关于ActiveMQ (二):JMS、ActiveMQ - JMS 与 ActiveMQ 介绍、ActiveMQ 5.10.0 发布,JMS 消息服务器、ActiveMQ 5.12.0 发布,JMS 消息服务器的相关信息,请在本站查询。
本文标签: