GVKun编程网logo

避免在JMS / ActiveMQ上重复消息(activemq如何保证消息不重复)

29

如果您对避免在JMS/ActiveMQ上重复消息感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于避免在JMS/ActiveMQ上重复消息的详细内容,我们还将为您解答activ

如果您对避免在JMS / ActiveMQ上重复消息感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于避免在JMS / ActiveMQ上重复消息的详细内容,我们还将为您解答activemq如何保证消息不重复的相关问题,并且为您提供关于ActiveMQ (二):JMS、ActiveMQ - JMS 与 ActiveMQ 介绍、ActiveMQ 5.10.0 发布,JMS 消息服务器、ActiveMQ 5.12.0 发布,JMS 消息服务器的有价值信息。

本文目录一览:

避免在JMS / ActiveMQ上重复消息(activemq如何保证消息不重复)

避免在JMS / ActiveMQ上重复消息(activemq如何保证消息不重复)

有没有一种方法可以抑制ActiveMQ服务器上定义的队列上的重复消息?

我尝试手动定义JMSMessageID((message.setJMSMessageID(“
uniqueid”)),但是服务器忽略此修改并使用内置的JMSMessageID传递消息。

根据规范,我没有找到有关如何删除邮件重复数据的参考。

在HornetQ中,要解决此问题,我们需要在消息定义中声明HQ特定的属性org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID。

即:

Message jmsMessage = session.createMessage();String myUniqueID = "This is my unique id"; // Could use a UUID for thismessage.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

有人知道ActiveMQ是否有类似的解决方案?

答案1

小编典典

您应该看看Apache
Camel,它提供了一个与任何JMS提供程序一起使用的幂等消费者组件,请参阅:http : //camel.apache.org/idempotent-
consumer.html

结合使用ActiveMQ组件可以使使用JMS非常简单,请参见:http :
//camel.apache.org/activemq.html

ActiveMQ (二):JMS

ActiveMQ (二):JMS

1.前言

    由于ActiveMQ是一种完全符合JMS规范的一种通信工具,所以在使用ActiveMQ前认识JMS规范就变的十分必要了。

    认识JMS主要从以下方面:

      a. JMS 模型

      b. JMS 对象模型

      c. JMS 传递方式

      d. JMS 消息类型<消息正文格式>

 

  2. 模型

    Java消息服务应用程序结构支持两种模型:

      点对点或队列模型

      这种模型的特殊之处在于:生产者不会指定唯一的消费者消费消息,而是多个消费者消费一个消息,即多个消费者都可以消费这条消息,且只有一个消费者可以消费到。

      

 

      发布者/订阅者模型

      这种模型是使用发布/订阅者模式,订阅者保存Topic地址,从地址中“拉取”消息数据。

      

    

   3. 对象模型

    ActiveMQ 依赖下面的模型进行消息发送接收。

    模型图如下:

    

 

    JMS对象模型包含如下几个要素: 

      1)连接工厂。

        连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。

 

      2)JMS连接。

        JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。

 

      3)JMS会话。

        JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。

 

      4)JMS目的。

        JMS目的(Destination),又称为消息队列,是实际的消息源。

 

      5)JMS生产者和消费者。

        生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。

 

      6)JMS消息通常有两种类型:

         ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。

         ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。

 

    4.传递方式

      JMS有两种传递消息的方式:

        NON_PERSISTENT(非持久化):消息最多投递一次。若消费者服务未启动,则在消息发送后再启动消费者服务,消费者无法获取到之前发送的消息。

        PERSISTENT(持久化):使用暂存后再转送的机理投递。消费者在消息发送后启动服务,可以接收到之前发送的消息。

 

    5. JMS消息类型

        MS定义了五种不同的消息正文格式,以及调用的消息类型:        

          · StreamMessage -- Java原始值的数据流

 

          · MapMessage--一套名称-值对

 

          · TextMessage--一个字符串对象

 

          · ObjectMessage--一个序列化的 Java对象

 

          · BytesMessage--一个未解释字节的数据流

    参考资料:

    1. http://baike.baidu.com/item/JMS/2836691?sefr=enterbtn#5

    2. http://shmilyaw-hotmail-com.iteye.com/blog/1897635

ActiveMQ - JMS 与 ActiveMQ 介绍

ActiveMQ - JMS 与 ActiveMQ 介绍

本文大部分转载于 PPT(JMS 中间件 ActiveMQ 介绍.pptx)

JMS 介绍

JMS 源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun 公司和它的合作伙伴设计的 JMS API 定义了一组公共的应用程序接口和相应语法,使得 Java 程序能够和其他消息组件进行通信。JMS 有四个组成部分:JMS 服务提供者、消息管理对象、消息的生产者消费者和消息本身。

(1)、JMS 服务提供者实现消息队列和通知,同时实现消息管理的 API。JMS 已经是 J2EE API 的一部分,J2EE 服务器都提供 JMS 服务。

(2)、消息管理对象提供对消息进行操作的 API。JMS API 中有两个消息管理对象:创建 jms 连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同 ConnectionFactory 可以分为 QueueConnectionFactory 和 TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

(3)、消息的生产者和消费者。消息的产生由 JMS 的客户端完成,JMS 服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber) 和点对点消息的接收者(queue receiver)

(4)、消息。消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS 消息由以下三部分组成:

  • 消息头(header)―― JMS 消息头包含了许多字段,它们是消息发送后由 JMS 提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

  • 属性(property)―― 用来添加删除消息头以外的附加信息。

  • 消息体(body)――JMS 中定义了 5 种消息体:ByteMessage、MapMessage、ObjectMessage、StreamMessage 和 TextMessage。

JMS 模型

Java 消息服务应用程序结构支持两种模型: 点对点模型 (基于队列) 和 发布者 / 订阅者模型(基于主题的)。

(1)点对点模型 (基于队列)

每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的相关性。可以由多个发送者,但只能被一个消费者消费。它具有如下特点:

  • 一个消息只能被一个接受者接受一次
  • 生产者把消息发送到队列中 (Queue),这个队列可以理解为电视机频道 (channel) ,在这个消息中间件上有多个这样的 channel
  • 接受者无需订阅,当接受者未接受到消息时就会处于阻塞状态

(2) 发布者 / 订阅者模型(基于主题的)

每个消息可以有多个消费者。生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。

  • 允许多个接受者,类似于广播的方式
  • 生产者将消息发送到主题上 (Topic)
  • 接受者必须先订阅

输入图片说明

JMS 消息发送时序图

输入图片说明

消费者的消费方式分为同步和异步两种:

  • 同步方式。通过调用消费者的 receive 方法从目的地中显式提取消息。receive 方法可以一直阻塞到消息到达。
  • 异步方式。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。实现 MessageListener 接口,在 MessageListener()方法中实现消息的处理逻辑。

ActiveMQ

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。

ActiveMQ 的通信机制

activeMQ 支持多种通讯协议 TCP/UDP 等,选取最常用的 TCP 来分析 activeMQ 的通讯机制。首先明确概念:

客户 (Client):消息的生产者、消费者对 activeMQ 来说都叫作客户。

消息中转器 (Message broker):它是 activeMQ 的核心,它接收信息并进行相关处理后分发给消息消费者。 为了能清楚的描述出 activeMQ 的核心通讯机制,我们选择 3 个部分来进行说明,它们分别是建立链接、关闭链接、心跳。

输入图片说明

** Client 跟 activeMQ 的 TCP 通讯的初始化过程分析 **

1. activeMQ 初始化时,通过 TcpTransportServer 类根据配置打开 TCP 侦听端口,客户通过该端口发起建立链接的动作。

2. 把 accept 的 Socket 放入阻塞队列中。

3. 另外一个线程 Socket handler 阻塞着等待队列中是否有新的 Socket,如果有则取出来。

4. 生成一个 TransportConnection 的实例。TransportConnection 类的主要作用是处理链路的状态信息,并实现 CommandVisitor 接口来完成各类消息的处理。

5. TransportConnection 会使用一个由多个 TransportFilter 实例组成的消息处理链条,负责对接收到的各类消息进行处理并发送相应的应答。这个链条的典型组成顺序:MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在这条链条中最后的一环就是 TcpTransport 类,它是实际和 Client 获取和发送数据的地方。

6. 建链完成,可以进行通讯操作。方法有 run () 和 oneway (),一个负责读取,一个负责发送。

** 关闭链接 **

activeMQ 发现 TCP 链接的关闭,最关键的代码在 TcpBufferedInputStream 类中的 int n = in.read (buffer, position, buffer.length - position);

** 心跳 **

为了更好的维护 TCP 链路的使用,activeMQ 采用了心跳机制作为判断双方链路的健康情况。activeMQ 使用的是双向心跳,也就是 activeMQ 的 Broker 和 Client 双方都进行相互心跳,但不管是 Broker 或 Client 心跳的具体处理情况是完全一样的,都在 InactivityMonitor 类中实现,下面具体介绍。

心跳会产生两个线程 “InactivityMonitor ReadCheck” 和 “InactivityMonitor WriteCheck”,它们都是 Timer 类型,都会隔一段固定时间被调用一次。ReadCheck 线程主要调用的方法是 readCheck (),当在等待时间内,有消息接收到,则该方法会返回 true。WriteCheck 线程主要调用的方法是 writeCheck (),这有个小技巧,大家可以参考一下,那就是当 WriteCheck 线程休眠时,有任何数据发送成功,则该线程被唤醒后,不用通过 TCP 向对方真的发送心跳消息,这样可以从一定程度上减少网络传输的数据量

ActiveMQ 5.10.0 发布,JMS 消息服务器

ActiveMQ 5.10.0 发布,JMS 消息服务器

ActiveMQ 5.10.0 发布,此版本修复了超过 200 个问题,引入了大量的改进,特别是 MQTT 和 AMQP 支持方面的。更新内容如下:

  • Java 8 支持

  • Apache Shiro 安全插件 - http://activemq.apache.org/shiro.html

  • 加强 MQTT 支持

  • 加强 AMQP 支持

  • 加强 LevelDB store

  • 改进 RAR/JCA 适配器

  • 改进 Runtime 配置插件

  • 改进 Web 工作台


ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

主要特点:

1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过JDBC和journal提供高速的消息持久化

7. 从设计上保证了高性能的集群,客户端-服务器,点对点

8. 支持Ajax

9. 支持与Axis的整合

10. 可以很容易得调用内嵌JMS provider,进行测试


ActiveMQ 5.12.0 发布,JMS 消息服务器

ActiveMQ 5.12.0 发布,JMS 消息服务器

ActiveMQ 5.12.0 发布,此版本解决了一些 issues,改进了 AMQP 和 MQTT 支持,修复了 STOMP 和 MQTT 相关的问题。

下载:

  • apache-activemq-5.12.0-bin.zip

  • apache-activemq-5.12.0-bin.tar.gz

改进列表:

  • Supports a variety of Cross Language Clients and Protocols from Java, C, C++, C#, Ruby, Perl, Python, PHP

    • OpenWire for high performance clients in Java, C, C++, C#

    • Stomp support so that clients can be written easily in C, Ruby, Perl, Python, PHP, ActionScript/Flash, Smalltalk to talk to ActiveMQ as well as any other popular Message Broker

    • AMQP v1.0 support

    • MQTT v3.1 support allowing for connections in an IoT environment.

  • full support for the Enterprise Integration Patterns both in the JMS client and the Message Broker

  • Supports many advanced features such as Message Groups, Virtual Destinations, Wildcards and Composite Destinations

  • Fully supports JMS 1.1 and J2EE 1.4 with support for transient, persistent, transactional and XA messaging

  • Spring Support so that ActiveMQ can be easily embedded into Spring applications and configured using Spring''s XML configuration mechanism

  • Tested inside popular J2EE servers such as TomEE, Geronimo, JBoss, GlassFish and WebLogic

    • Includes JCA 1.5 resource adaptors for inbound & outbound messaging so that ActiveMQ should auto-deploy in any J2EE 1.4 compliant server

  • Supports pluggable transport protocols such as in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports

  • Supports very fast persistence using JDBC along with a high performance journal

  • Designed for high performance clustering, client-server, peer based communication

  • REST API to provide technology agnostic and language neutral web based API to messaging

  • Ajax to support web streaming support to web browsers using pure DHTML, allowing web browsers to be part of the messaging fabric

  • CXF and Axis Support so that ActiveMQ can be easily dropped into either of these web service stacks to provide reliable messaging

  • Can be used as an in memory JMS provider, ideal for unit testing JMS

更多内容请看发行说明。

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。

我们今天的关于避免在JMS / ActiveMQ上重复消息activemq如何保证消息不重复的分享已经告一段落,感谢您的关注,如果您想了解更多关于ActiveMQ (二):JMS、ActiveMQ - JMS 与 ActiveMQ 介绍、ActiveMQ 5.10.0 发布,JMS 消息服务器、ActiveMQ 5.12.0 发布,JMS 消息服务器的相关信息,请在本站查询。

本文标签: