GVKun编程网logo

《Apache RocketMQ 深入浅出》系列文章(org.apache.rocketmq)

22

如果您想了解《ApacheRocketMQ深入浅出》系列文章的相关知识,那么本文是一篇不可错过的文章,我们将对org.apache.rocketmq进行全面详尽的解释,并且为您提供关于ApacheDu

如果您想了解《Apache RocketMQ 深入浅出》系列文章的相关知识,那么本文是一篇不可错过的文章,我们将对org.apache.rocketmq进行全面详尽的解释,并且为您提供关于Apache Dubbo 和 Apache RocketMQ 邀您参与,ASF 亚洲峰会 5 张门票免费送、Apache RocketMQ (TLP) 发布首个子项目:RocketMQ-Spring-Boot、Apache RocketMQ 4.2.0 安装及 API 调用教程、Apache RocketMQ 4.2安装的有价值的信息。

本文目录一览:

《Apache RocketMQ 深入浅出》系列文章(org.apache.rocketmq)

《Apache RocketMQ 深入浅出》系列文章(org.apache.rocketmq)

Apache RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。 RocketMQ在阿里集团也被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binlog分发等场景。

RocketMQ 主要有四大核心组成部分:NameServer、broker、Producer以及Consumer四部分。

欢迎关注《Apache RocketMQ 深入浅出》系列文章,架构师将循序渐进地讲解Apache RocketMQ的开发实践。

 

1. Apache RocketMQ 入门介绍和整体架构图 2. 介绍新版RocketMQ  v4.9.3 下载、安装、配置的完成过程 3. 启动和停止RocketMQ服务进程、测试消息的发送和消费 4. Spring Boot集成RocketMQ :生产者和消费者开发入门实践 5. RocketMQ 可视化管理界面Dashboard的搭建和使用 6. 了解Apache RocketMQ 中的消息类型和消费模式 7. Spring Boot 集成RocketMQ:使用rocketmq-spring-boot-starter 生产和消费消息 8. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送同步、异步和单向消息 9. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送顺序消息 10. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送延时消息 11. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(1) 12. Spring Boot 集成RocketMQ:使用RocketMQTemplate实现发送事务消息(2) 13. RocketMQ 消费模式:集群消费模式和广播消费模式 14. RocketMQ消费端Push和Pull两种消费方式:拉模式开发示例 15. RocketMQ消费端Push和Pull两种消费方式:推模式开发示例 16. RocketMQ中高级特性-消息过滤和标签Tag开发实战(1) 17. RocketMQ中高级特性-消息过滤和标签Tag开发实战(2) 18. RocketMQ 中MessageExt 详解和开发实战 19. RocketMQ 消息重试机制开发实战 .....



总结

以上是小编为你收集整理的《Apache RocketMQ 深入浅出》系列文章全部内容。

如果觉得小编网站内容还不错,欢迎将小编网站推荐给好友。

原文地址:https://www.cnblogs.com/rickie/p/16128590.html

Apache Dubbo 和 Apache RocketMQ 邀您参与,ASF 亚洲峰会 5 张门票免费送

Apache Dubbo 和 Apache RocketMQ 邀您参与,ASF 亚洲峰会 5 张门票免费送

今年,CommunityOverCode Asia 2023 将是阿帕奇亚洲大会的首次线下会议,北京,8 月 18 日至 20 日。会议将持续 3 天,设有 17 个论坛方向,共收集到 150 余个议题投稿,其中中文议题约 110 个,英文议题近 40 个。Apache Dubbo 和 Apache RocketMQ 邀请您来参会,点击阅读原文或扫描下方海报的二维码、填写问卷,有机会免费获得 3 天通票。

 title=

关于 Apache Dubbo

Apache Dubbo 是一款易用、高性能的 WEB 和 RPC 框架,同时为构建企业级微服务提供服务发现、流量治理、可观测、认证鉴权等能力、工具与最佳实践,已被中国工商银行、小米、携程网、恒生电子、海尔、新东方、顺丰科技等企业采用,并荣获 2022 InfoQ 中国开源开源发展报告项目排名第 6、2022 CSDN 中国开发者影响力年度开源影响力项目、2022 开源项目成熟度评估优秀壹级等荣誉。

关于 Apache RocketMQ

Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

延伸阅读: 阿里云中间件开源往事

Apache RocketMQ (TLP) 发布首个子项目:RocketMQ-Spring-Boot

Apache RocketMQ (TLP) 发布首个子项目:RocketMQ-Spring-Boot

近期,Apache RocketMQ在社区中上线首个子项目RocketMQ-Spring-Boot。第一个版本中包含了若干新特性,修复并改进了针对rocketmq-external孵化报告中提到的若干问题。

  • 新特性:

1. 在事务中发送消息;

2. 支持发送 DelayTimeLevel 消息;

3. RocketMQ 原生 API透明化,只需要使用 Spring Message API而非RocketMQ客户端API 即可实现相应功能;

 

  • 改进:

1. 添加示例以演示RocketMQ-spring-boot/starter的使用,并涵盖所有客户端场景;

2. 用Spring标准(生命周期、BeanCreating条件、Bean Initialize、AutoConfigure Test等)修饰代码;

3. 调整目录层次结构,并使用不同级别的项目(即spring-boot、spring-boot-starter和sample)分离函数;

4. 重构包并使代码更加清晰,具有完全限定的命名和标准的Java文档注释;

 

  • 修复:

1. 修复使用Bean方法''rocketMQTemplate''未加载的问题;

2. 修复 Debug ToCKMQListEnter 容器中消息类型不正确判断的问题;

3. spring-boot-starter 文档更新;

4. 修复注释@RocketMQMessageListener(consumeThreadMax=4,.=“${sep.queue.meta}”不能生效的问题;

 

项目地址:https://github.com/apache/rocketmq-spring

Apache RocketMQ 4.2.0 安装及 API 调用教程

Apache RocketMQ 4.2.0 安装及 API 调用教程

安装环境

  • CentOS 7.0 64bit
  • JDK1.8 64bit

条件有限,使用两台(192.168.1.101,192.168.1.102)虚拟机作为服务器,因此以 2m-noslave(多 master 模式)来运行 RocketMQ 服务

下载及解压

选择华科镜像源,下载编译好的安装包(也可以下载 RocketMQ 项目源码,使用 maven 打包)

> wget http://mirrors.hust.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip

解压到 /usr/local 文件夹

> sudo unzip rocketmq-all-4.2.0-bin-release.zip -d /usr/local/rocketmq

修改文件夹所有者(USERNAME 为系统登录用户名),避免无权限执行启动命令

> cd /usr/local/
> sudo chown -R USERNAME: rocketmq

修改配置文件

两台服务器都下载安装完后,修改机器 192.168.1.101 的 MQ 配置文件

> vim rocketmq/conf/2m-noslave/broker-a.properties
brokerClusterName=MyCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
brokerIP1=192.168.1.101
storePathCommitLog=/usr/local/rocketmq/store/commitlog
storePathConsumerQueue=/usr/local/rocketmq/store/consumequeue

修改机器 192.168.1.102 的 MQ 配置文件

> vim rocketmq/conf/2m-noslave/broker-b.properties
brokerClusterName=MyCluster
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10911
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
brokerIP1=192.168.1.102
storePathCommitLog=/usr/local/rocketmq/store/commitlog
storePathConsumerQueue=/usr/local/rocketmq/store/consumequeue

启动服务

两台服务器分别启动 Name Server

> cd rocketmq
> nohup sh bin/mqnamesrv &

查看 Name Server 启动日志

> tail -f logs/rocketmqlogs/namesrv.log
INFO main - The Name Server boot success. serializeType=JSON

启动服务器 192.168.1.101 的 Broker

> nohup sh bin/mqbroker -c conf/2m-noslave/broker-a.properties  >/dev/null 2>&1 &

启动服务器 192.168.1.102 的 Broker

> nohup sh bin/mqbroker -c conf/2m-noslave/broker-b.properties  >/dev/null 2>&1 &

查看 Broker 启动日志

> tail -f logs/rocketmqlogs/broker.log 
INFO main - The broker[broker-a, 192.168.1.101:10911] boot success. serializeType=JSON

创建 Topic, 任一服务器执行

> sh bin/mqadmin updateTopic –n 192.168.1.101:9876;192.168.1.102:9876 –c MyCluster –t TopicTest

关闭服务

> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(11786) is running...
Send shutdown request to mqnamesrv(11786) OK

Java API 调用示例

新建 Maven 项目,引入依赖包

	<dependency>
	    <groupId>org.apache.rocketmq</groupId>
	    <artifactId>rocketmq-client</artifactId>
	    <version>4.2.0</version>
	</dependency>

Producer 类,生产者,发送消息

public class Producer {

	private static int count = 100;

	public static void main(String[] args) throws Exception {
		// 设置生产者组名
		DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
		// 指定nameServer的地址
		producer.setNamesrvAddr("192.168.1.101:9876;192.168.1.102:9876");
		// 启动实例
		producer.start();

		final Semaphore semaphore = new Semaphore(0);

		for (int i = 0; i < count; i++) {
			Thread.sleep(3000);
			Message message = new Message("TopicTest", 
					"test_tag",
					("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

			producer.send(message, new SendCallback() {
				public void onSuccess(SendResult sendResult) {
					System.out.println(String.format("message [%s] send success!", new String(message.getBody())));
					semaphore.release();
				}

				public void onException(Throwable throwable) {
					throwable.printStackTrace();
				}
			});

		}
		semaphore.acquire(count);
		//关闭生产者,释放资源
		producer.shutdown();
	}
}

Consumer 类,消费者,接受处理消息

public class Consumer {

	public static void main(String[] args) throws InterruptedException, MQClientException {
		//设置消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
        //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //指定nameServer的地址
        consumer.setNamesrvAddr("10.0.74.198:9876;10.0.74.199:9876");
        //指定订阅的topic及tag表达式
        consumer.subscribe("TopicTest", "*");
        
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
					ConsumeConcurrentlyContext context) {
				MessageExt messageExt = msgs.get(0);
				System.out.println(String.format("Custome message [%s],tagName[%s]", 
						new String(messageExt.getBody()),
						messageExt.getTags()));
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		//启动消费者实例
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

依次运行 Consumer、Producer, 控制台打印如下信息

#Producer console info
message [Hello RocketMQ 0] send success!
message [Hello RocketMQ 1] send success!
message [Hello RocketMQ 2] send success!
message [Hello RocketMQ 3] send success!
...
#Consumer console info
Custome message [Hello RocketMQ 0],tagName[test_tag]
Custome message [Hello RocketMQ 1],tagName[test_tag]
Custome message [Hello RocketMQ 2],tagName[test_tag]
Custome message [Hello RocketMQ 3],tagName[test_tag]
...

常见运行错误 “No Topic Route Info”,请检查防火墙是否开启并放开 9876 和 10911 两个端口!

相关文章
Apache RocketMQ 架构及核心概念
RocketMQ-Console 安装及 RocketMQ 命令行管理工具介绍

Apache RocketMQ 4.2安装

Apache RocketMQ 4.2安装

Apache RocketMQ是阿里原来的RocketMQ开源捐赠给Apache基金会的,目前已成为Apache下的顶级项目。RocketMQ 4.0以下的版本是RocketMQ还未进入Apache孵化的阿里内部release版本,4.0以后是进入apache项目后release的版本。因此从4.0后RocketMQ的客服端源代码包名发生了变化,maven依赖也发生了变化。

安装

下载zip的二进制版本解压,解压时需要用指定解压到什么目录,这和tar命令解压是有区别。unzip解压不指定解压缩目标目录,解压后程序的各文件目录会分散到当前目录,这个和在window上解压是一样的,需要解压到指定文件夹,当然如果根据官方文档使用maven编译打包则不存在上面的问题。不想安装maven自己编译的可以使用下面方式。

# unzip -o rocketmq-all-4.2.0-bin-release.zip -d rocketmq
# cd rocketmq

启动Name Server

# nohup sh bin/mqnamesrv &
# tail -f ~/logs/rocketmqlogs/namesrv.log

启动Broker

# nohup sh bin/mqbroker -n localhost:9876 &
# tail -f ~/logs/rocketmqlogs/broker.log

启动mqbroker时需要确保宿主机有足够的内存,官方默认设置的-Xms8g -Xmx8g -Xmn4g都比较大,如果宿主机内存不够,对于只是安装个虚拟机测试使用RocketMQ,则可以在bin目录下将runbroker.sh中的jvm启动参数设置小一些。

关闭服务

# sh bin/mqshutdown broker
# sh bin/mqshutdown namesrv

connect to <172.17.42.1:10911> failed问题

这个ip是docker0的虚拟网卡的网关,但是启动rocketMQ时并没有设置这个ip,因此在启动Broker时指定ip的本机ip。 ps:参照官方的文档安装在命令行测试不会存在该问题,该问题主要是在开发时在开发环境连接到RocketMQ的服务器上才会出现该问题

# echo "brokerIP1=localhost" > conf/broker.properties
# nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.properties &

总结

写本文记录的目的主要并不是去介绍安装,因为官方的文档已经给了安装步骤,但是在安装和编写客户端代码测试的时候可能出现上面描述的一些问题。

今天关于《Apache RocketMQ 深入浅出》系列文章org.apache.rocketmq的介绍到此结束,谢谢您的阅读,有关Apache Dubbo 和 Apache RocketMQ 邀您参与,ASF 亚洲峰会 5 张门票免费送、Apache RocketMQ (TLP) 发布首个子项目:RocketMQ-Spring-Boot、Apache RocketMQ 4.2.0 安装及 API 调用教程、Apache RocketMQ 4.2安装等更多相关知识的信息可以在本站进行查询。

本文标签: