GVKun编程网logo

RocketMQ(rocketmq工作原理)

26

对于想了解RocketMQ的读者,本文将是一篇不可错过的文章,我们将详细介绍rocketmq工作原理,并且为您提供关于ApacheRocketMQRocketMQ是什么?消息中间件、RocketMQ(

对于想了解RocketMQ的读者,本文将是一篇不可错过的文章,我们将详细介绍rocketmq工作原理,并且为您提供关于Apache RocketMQ RocketMQ是什么? 消息中间件、RocketMQ (四) 使用RocketMQ原生API收发消息、RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目、RocketMQ 学习总结之一:RocketMQ 模块功能介绍的有价值信息。

本文目录一览:

RocketMQ(rocketmq工作原理)

RocketMQ(rocketmq工作原理)

RocketMQ

概念

RocketMQ 是一个消息队列中间件,具有高性能、高可靠、高实时、分布式特点。

能够保证严格的消息顺序

​ 顺序消息,消息入队列,生产方可以选择将消息入到哪一个队列

提供丰富的消息拉取模式

PUSH consumer 发送请求,保持长连接,broker每五秒察看是否有消息,有就回复给consumer

PULL 定时向broker拉取消息

保证消息不丢失

同步发送:阻塞当前进程,等到bocker返回发送结果

异步发送:首先构建任务,把任务交给线程池,等发送完后,执行用户自定义的回调函数

Oneway:只发送,不等结果

​ 发送超时,失败,会重试

​ 消费者会先拉取消息,消费完成后,才向服务器返回结果

​ 消费失败也会重试

架构图

image-20210113161821295

name server :控制中心,记录,做路由,需要首先启动

brocker:存储消息、事物回查

producer:先向name server查询broker信息,然后与其建立连接,发送消息

consumer:同样现象name server 查询broker,建立连接,拉取消息

消息

发送消息(生产者)

  • 创建DefaultMQProducer

    • produceGroup 区别生产者集群
  • 设置name server地址

  • 开启DefaultMQProducer

  • 创建消息Message

    • topic 主题
    • tags 标签,消费者可以根据tags过滤消息
    • keys 消息唯一值(主键)
    • body 消息具体内容
  • 发送消息

    • 会返回一个result
  • 关闭DefaultMQProducer

拉取消息(消费者)

  • 创建DefaultMQConsumer

    • 消费组
  • 设置name server

  • 设置subscript,要读取的主题信息

    • 消费主体、过滤规则(* 或者 tags1 || tags2)
  • 创建消息监听 MessageListener

  • 获取消息消息

  • 返回消息读取状态

    • 读取成功返回success
    • 失败则重试

普通消息

顺序消息

​ 每次发送只发送到指定队列,接收也从一个队列拉取消息

分布式事务消息

生产者:我要去干一些事情,过一会你再问我做完没

MQ Server:好的

生产方执行事务之前先向MQ Server发送Half消息,等到服务器回复后开始执行任务,此时,消费方并不能获取到消息,服务器会询问生产方事务执行状态,当生产方执行完毕后,再次向服务器发送消息,服务器 接收到后才commit,否则,进行回滚操作

消息批量发送/广播

​ 发送方-->List

​ 接收方 设置广播模式 consumer.setMessageModel(MessageMode.BOARDCASTING)

安装

进入官网下载 http://rocketmq.apache.org/release_notes/release-notes-4.3.0/

image-20210113170052735

解压下载的压缩包

配置环境变量

image-20210113180816037

修改启动参数 runserver.sh

image-20210113181001134

runbocker.sh

image-20210113181108888

修改 broker-a.properties 要新建日志文件夹

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 集群名称
brokerClusterName=rocketmq-cluster
# broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
# 0 表示Master,>0 表示Slave
brokerId=0
# nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876
# 在发送消息时,自动创建服务器不存在的Topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许broker 自动创建Topic, 建议线下开启, 线上关闭
autocreateTopicEnable=true
# 是否允许broker 自动创建订阅组, 建议线下开启, 线上关闭
autocreateSubscriptionGroup=true
# broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认是凌晨4点
deleteWhen=04
# 文件保留时间,默认是48小时
fileReservedTime=48
# commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue每个文件默认存30w条, 根据业务情况调整
mapedFileSizeConsumeQueue=30000
# destroyMapedFileIntervalForcibly=12000
# redeleteHangedFileInterval=12000
# 检测物理文件磁盘空间
diskMaxUsedspaceRatio=88
# 存储路径
storePathRootDir=D:\download\rocketmq-all-4.3.0-bin-release\log\store
# commitLog存储路径
storePathCommitLog=D:\download\rocketmq-all-4.3.0-bin-release\log\store\commitlog
# 消息队列储存路径
storePathConsumeQueue=D:\download\rocketmq-all-4.3.0-bin-release\log\store\consumequeue
# 消息索引粗存路径
storePathIndex=D:\download\rocketmq-all-4.3.0-bin-release\log\store\index
# checkpoint 文件储存路径
storeCheckpoint=D:\download\rocketmq-all-4.3.0-bin-release\log\store\checkpoint
# abort 文件存储路径
abortFile=D:\download\rocketmq-all-4.3.0-bin-release\log\store\abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# broker的角色
# -ASYNC_MASTER 异步复制Master
# -SYNC_MASTER 同步双写Master
# -SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushdiskType=ASYNC_FLUSH
# checkTransactionMessageEnable=false
# 发消息线程池数量
# sendMessageTreadPoolNums=128
# 拉消息线程池数量
# pullMessageTreadPoolNums=128lushdiskType=ASYNC_FLUSHH

将conf文件夹下的所有xml文件中的${user.home} 修改为D:/download/rocketmq-all-4.3.0-bin-release/log

进入bin目录,先 start mqnamesrv.cmd

提示出错

image-20210113182627677

但是配置了java_home还是报错

直接修改cmd文件

image-20210113184128678

启动成功

image-20210113184151692

start mqbroker.cmd -n 172.16.33.2:9876 autocreateTopicEnable=true

同样修改cmd文件,直接设置jdk路径,启动成功

image-20210113184325961

demo实现

新建maven项目

导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
            <version>4.3.0</version>
        </dependency>

创建生产者

package orderMessage;

import org.apache.rocketmq.client.exception.MQbrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author: chen
 * @date: 2021/1/13 17:23
 * @Description
 */
public class producer {

    int count = 0;

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQbrokerException, UnsupportedEncodingException {
        DefaultMQProducer producer = new DefaultMQProducer("order_message_producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        //  同步发送
        for (int i = 0; i < 5; i++) {
            Message message = new Message("order",
                    "test",
                    "id_" + i,
                    ("say_hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            System.out.println("produce开始发送消息--" + i);
            producer.send(message,
                    new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            return list.get((Integer)o);
                        }
                    },
                    0);
            System.out.println("produce消息发送完毕--" + i);
        }


        Thread.sleep(5000);

        //  异步发送
        for (int i = 6; i < 11; i++) {
            Message message = new Message("order",
                    "async",
                    "async_id_" + i,
                    ("say_async" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            System.out.println("produce开始异步发送消息--" + i);
            producer.send(message,
                    new MessageQueueSelector() {
                        public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                            return list.get((Integer) o);
                        }
                    },
                    0, new SendCallback() {
                        public void onSuccess(SendResult sendResult) {
                            System.out.println("---异步发送成功----");
                        }

                        public void onException(Throwable throwable) {
                            System.out.println("---异步发送失败----"+ throwable);
                        }
                    });
            System.out.println("produce异步消息发送完毕--" + i);
        }
        producer.shutdown();
    }
}

创建消费者

package orderMessage;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @author: chen
 * @date: 2021/1/13 17:34
 * @Description
 */
public class consumer {

    int count = 0;
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_message_consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("order", "*");
        //consumer.setConsumeMessageBatchMaxSize(3);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                ConsumeConcurrentlyStatus result = null;
                for (int i = 0 ; i < list.size(); i ++) {
                    System.out.println("consumer----接受消息----" + i);
                    Message message = list.get(i);
                    try {
                        System.out.println("message:" + message.getTopic()
                                + "---" + message.getTags()
                                + "---" + message.getKeys()
                                + "---" + new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printstacktrace();
                        result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                        break;
                    }
                }
                if (result == null) {
                    result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return result;
            }
        });
        consumer.start();
    }
}

报错org.apache.rocketmq.client.exception.MQClientException: No route info for this topic, order

原因是maven 引入的jar包的版本不一致,修改版本后能正常运行

image-20210113210746329

image-20210113210816918

分布式事务消息代码

public class transproducer {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer txPoducer=new TransactionMQProducer("transaction_group");
        txPoducer.setNamesrvAddr("127.0.0.1:9876");

        //本地事务执行和broker检查的回调都是由producer端来实现的
        txPoducer.setTransactionListener(new TransactionListener() {
            ConcurrentHashMap<String,Integer> map = new ConcurrentHashMap<String, Integer>();
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                String transactionId = msg.getTransactionId();
                map.put(transactionId,0);
                System.out.println("setTransactionListener   txID:"+transactionId+",body "+new String(msg.getBody()));
                try {
                    System.out.println("开始执行");
                    Thread.sleep(65000);
                    map.put(transactionId,1);
                    System.out.println("执行完毕");
                } catch (InterruptedException e) {
                    e.printstacktrace();
                    map.put(transactionId,2);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                String transactionId = msg.getTransactionId();
                Integer status = map.get(transactionId);
                System.out.println("消息会查---"+ status);
                switch (status){
                    case 0:
                        return LocalTransactionState.UNKNow;
                    case 1:
                        return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:
                        return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return LocalTransactionState.UNKNow;

            }
        });

        txPoducer.start();

        TransactionSendResult temp = txPoducer.sendMessageInTransaction(new Message("order", "测试分布式消息事务流程".getBytes()),null);

        System.out.println("消息回执:"+temp);

    }
}

image-20210113214516848

image-20210113214603700

通过pull方式拉取消息

public class pullConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        consumer.start();

        // 从指定topic中拉取所有消息队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("order");

        for (MessageQueue mq : mqs) {
            // 获取消息的offset,指定从store中获取
            long offset = consumer.fetchConsumeOffset(mq, true);
            System.out.println("queue:" + mq + ":" + offset);

            try {
                while (true) {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq,
                            null, consumer.fetchConsumeOffset(mq, false), 1);

                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginoffset());

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt message : messageExtList) {
                                System.out.println("message:" + message.getTopic()
                                        + "---" + message.getTags()
                                        + "---" + message.getKeys()
                                        + "---" + new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEgal:
                            break;
                        default:
                            break;
                    }
                }

            } catch (Exception e) {
                e.printstacktrace();
            }
        }
        consumer.shutdown();

    }

image-20210113215947194

总结

以上是小编为你收集整理的RocketMQ全部内容。

如果觉得小编网站内容还不错,欢迎将小编网站推荐给好友。

原文地址:https://www.cnblogs.com/chen-sc-cn/p/15829954.html

Apache RocketMQ RocketMQ是什么? 消息中间件

Apache RocketMQ RocketMQ是什么? 消息中间件

Apache RocketMQ RocketMQ是什么? 介绍

RocketMQ是什么?

RocketMQ
是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序

  • 提供丰富的消息拉取模式

  • 高效的订阅者水平扩展能力

  • 实时的消息订阅机制

  • 亿级消息堆积能力

  • Metaq3.0 版本改名,产品名称改为RocketMQ

Apache RocketMQ RocketMQ是什么? 官网

https://rocketmq.apache.org/

RocketMQ (四) 使用RocketMQ原生API收发消息

RocketMQ (四) 使用RocketMQ原生API收发消息

文章目录

      • pom文件
      • 同步消息
        • 生产者
        • 消费者
      • 异步消息
        • 生产者
        • 消费者
      • 单向消息
        • 生产者
        • 消费者
      • 顺序消息
        • 生产者
        • 消费者
      • 延时消息
        • 生产者
        • 消费者
      • 批量消息
        • 生产者
        • 消费者
      • 消息过滤
        • Tag 过滤
        • 对自定义属性过滤
        • 生产者
        • 消费者
      • 事务消息
        • 事务消息的原理
        • 生产者
        • 消费者

创建项目

pom文件

新建 maven 项目或 module,添加 rocketmq-client 依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.tedu</groupId>
    <artifactId>rocketmq-api</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-store</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

同步消息

在这里插入图片描述

同步消息发送要保证强一致性,发到master的消息向slave复制后,才会向生产者发送反馈信息。

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

在这里插入图片描述

生产者

package m1;

import org.apache.rocketmq.client.exception.MQbrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQbrokerException {
        //新建生产者实例
        DefaultMQProducer p = new DefaultMQProducer("producer1");
        //设置 name server 地址
        p.setNamesrvAddr("192.168.64.141:9876");
        //启动生产者 连接服务器
        p.start();

        //消息数据封装到 Message 对象
        //发送
        while(true){
            System.out.println("输入消息:");
            String s = new Scanner(system.in).nextLine();
            /**
             * Topic --- 一级分类
             * Tag --- 二级分类(可选)
             */
            //发送的目的地、发送给谁、发送的消息
            Message msg = new Message("Topic1", "TagA", s.getBytes());//Topic1是自己在服务器上创建的
            SendResult r = p.send(msg);
            System.out.println("发送的消息:"+r);
        }
    }
}

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

消费者

消费者的要点:

1.push 和 pull

消费者有两种模式:push 和 pull。

push 模式由服务器主动向消费者发送消息;pull 模式由消费者主动向服务器请求消息。

在消费者处理能力有限时,为了减轻消费者的压力,可以采用pull模式。多数情况下都采用 pull 模式。

2.NameServer

消费者需要向 NameServer 询问 Topic 的路由信息。

3.Topic

从指定的Topic接收消息。Topic相当于是一级分类。

4.Tag

Topic 相当于是一级分类,Tag 相当于是2级分类。

  • 多个 Tag 可以这样写: TagA || TagB || TagC
  • 不指定 Tag,或者说接收所有的 Tag,可以写星号:*
package m1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //新建消费者实例
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("Consumer1");

        //设置 name server
        c.setNamesrvAddr("192.168.64.141:9876");

        //订阅消息(从哪订阅消息)
        /**
         * 标签设置:
         *      TagA
         *      TagB || TagC || TagD
         *      *
         */
        c.subscribe("Topic1", "TagA");//从topic1订阅标签A的消息

        //消息监听器
        //Concurrently监听器会启动多个线程,可以并行的处理多条消息
        c.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt ext : msgs) {//快捷键:msgs.for
                    String s = new String(ext.getBody());
                    System.out.println("收到:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        
        //启动
        c.start();
    }
}

在这里插入图片描述

异步消息

在这里插入图片描述


master 收到消息后立即向生产者进行反馈。之后再以异步方式向 slave 复制消息。

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待broker的响应。

生产者

package demo2;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
异步发送消息

一条消息送出后, 不必暂停等待服务器针对这条消息的反馈, 而是可以立即发送后续消息.
使用监听器, 以异步的方式接收服务器的反馈
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo2");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        p.setRetryTimesWhenSendAsyncFailed(0);

        String topic = "Topic2";
        String tag = "TagA";
        String key = "Key-demo2";


        while (true) {
            System.out.print("输入消息,用逗号分隔多条消息: ");
            String[] a = new Scanner(system.in).nextLine().split(",");

            for (String s : a) {
                Message msg = new Message(topic, tag, key, s.getBytes());

                p.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("\n\n消息发送成功 : "+sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("\n\n消息发送失败");
                    }
                });

                System.out.println("--------------------消息已送出-----------------------");
            }
        }
    }
}

消费者

package demo2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
/*
与 demo1.Consumer 完全相同
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        c.subscribe("Topic2", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()) + " - " + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始消费数据");
    }
}

单向消息

在这里插入图片描述


这种方式主要用在不特别关心发送结果的场景,例如日志发送。

生产者

package demo3;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

/*
单向消息

消息发出后, 服务器不会返回结果
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo3");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic3";
        String tag = "TagA";

        while (true) {
            System.out.print("输入消息,用逗号分隔多条消息: ");
            String[] a = new Scanner(system.in).nextLine().split(",");
            for (String s : a) {
                Message msg = new Message(topic, tag, s.getBytes());
                p.sendOneway(msg);
            }
            System.out.println("--------------------消息已送出-----------------------");
        }
    }
}

消费者

package demo3;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/*
与 demo1.Consumer 完全相同
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo2");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");

        c.subscribe("Topic3", "TagA");

        c.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()) + " - " + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        c.start();
        System.out.println("开始消费数据");
    }
}

顺序消息

在这里插入图片描述


上图演示了 Rocketmq 顺序消息的基本原理:

  • 同一组有序的消息序列,会被发送到同一个队列,按照 FIFO 的方式进行处理
  • 一个队列只允许一个消费者线程接收消息,这样就保证消息按顺序被接收

下面以订单为例:

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中。消费时,从同一个队列接收同一个订单的消息。

生产者

package demo4;

import org.apache.rocketmq.client.exception.MQbrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;
import java.util.Scanner;
/*
以下消息, 相同id的消息按顺序发送到同一个队列,
消费时也从同一个队列按顺序消费
                                              topic

                                        =======================  queue1
                                        =======================  queue2
111,消息1  111,消息2  111,消息3   ------->=======================  queue3
                                        =======================  queue4
222,消息1  222,消息2  222,消息3   ------->=======================  queue5
                                        =======================  queue6
333,消息1  333,消息2  333,消息3   ------->=======================  queue7
                                        =======================  queue8
                                                    ......
 */
public class Producer {
    static String[] msgs = {
            "15103111039,创建",
                                "15103111065,创建",
            "15103111039,付款",
                                                    "15103117235,创建",
                                "15103111065,付款",
                                                    "15103117235,付款",
                                "15103111065,完成",
            "15103111039,推送",
                                                    "15103117235,完成",
            "15103111039,完成"
    };

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQbrokerException {
        DefaultMQProducer p = new DefaultMQProducer("producer-demo4");
        p.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        p.start();

        String topic = "Topic4";
        String tag = "TagA";

        for (String s : msgs) {
            System.out.println("按回车发送此消息: "+s);
            new Scanner(system.in).nextLine();

            Message msg = new Message(topic, tag, s.getBytes());

            String[] a = s.split(",");
            long orderId = Long.parseLong(a[0]);

            /*
            MessageQueueSelector用来选择发送的队列,
            这里用订单的id对队列数量取余来计算队列索引

            send(msg, queueSelector, obj)
            第三个参数会传递到queueSelector, 作为它的第三个参数
             */
            SendResult r = p.send(msg, new MessageQueueSelector() {
                /*
                三个参数的含义:
                queueList: 当前Topic中所有队列的列表
                message: 消息
                o: send()方法传入的orderId
                 */
                @Override
                public MessageQueue select(List<MessageQueue> queueList, Message message, Object o) {
                    Long orderId = (Long) o;
                    //订单id对队列数量取余, 相同订单id得到相同的队列索引
                    long index = orderId % queueList.size();
                    System.out.println("消息已发送到: "+queueList.get((int) index));
                    return queueList.get((int) index);
                }
            }, orderId);

            System.out.println(r+"\n\n");
        }
    }
}

消费者

package demo4;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("consumer-demo4");
        c.setNamesrvAddr("192.168.64.151:9876;192.168.64.152:9876");
        
        c.subscribe("Topic4", "*");

        c.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                String t = Thread.currentThread().getName();

                for (MessageExt msg : list) {
                    System.out.println(t+" - "+ msg.getQueueId() + " - " +new String(msg.getBody()));
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        c.start();
        System.out.println("开始消费数据");
    }
}

延时消息

消息发送到 Rocketmq 服务器后, 延迟一定时间再向消费者进行投递。

延时消息的使用场景:

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

生产者发送消息时,对消息进行延时设置:

msg.setDelayTimeLevel(3);

其中3 代表级别而不是一个具体的时间值,级别和延时时长对应关系是在MessageStoreConfig 类种进行定义的:

this.messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

对应关系表:

级别延时时长
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h

生产者

package m1;

import org.apache.rocketmq.client.exception.MQbrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.Scanner;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQbrokerException {
        //新建生产者实例
        DefaultMQProducer p = new DefaultMQProducer("producer1");
        //设置 name server 地址
        p.setNamesrvAddr("192.168.64.141:9876");
        //启动生产者 连接服务器
        p.start();

        //消息数据封装到 Message 对象
        //发送
        while(true){
            System.out.println("输入消息:");
            String s = new Scanner(system.in).nextLine();
            /**
             * Topic --- 一级分类
             * Tag --- 二级分类(可选)
             */
            //发送的目的地、发送给谁、发送的消息
            Message msg = new Message("Topic1", "TagA", s.getBytes());//Topic1是自己在服务器上创建的
            if(Math.random() < 0.5){
                msg.setDelayTimeLevel(3);//代表10秒
                System.out.println("这条消息延时10秒");
            }
            SendResult r = p.send(msg);
            System.out.println("发送的消息:"+r);
        }
    }
}

在这里插入图片描述

消费者

package m1;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Authod yuhe
 * Create 2021-10-29-16:40
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //新建消费者实例
        DefaultMQPushConsumer c = new DefaultMQPushConsumer("Consumer1");

        //设置 name server
        c.setNamesrvAddr("192.168.64.141:9876");

        //订阅消息(从哪订阅消息)
        /**
         * 标签设置:
         *      TagA
         *      TagB || TagC || TagD
         *      *
         */
        c.subscribe("Topic1", "TagA");//从topic1订阅标签A的消息

        //消息监听器
        //Concurrently监听器会启动多个线程,可以并行的处理多条消息
        c.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt ext : msgs) {//快捷键:msgs.for
                    String s = new String(ext.getBody());
                    System.out.println("收到:"+s);
                }
                //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                /**
                 * 在消息处理失败时,可以告诉服务器,稍后重新发送消息,重新消费。
                 * 如果多次处理失败,最多会重试 18 次(18个延时级别),重试的时间间隔会越来越长
                 */
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        //启动
        c.start();
    }
}

在这里插入图片描述

批量消息

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

生产者


消费者


消息过滤

Tag 过滤

Tag 可以满足大多数消息过滤的需求。使用 Tag 过滤非常简单,例如:

consumer.subscribe("Topic1", "TagA || TagB || TagC");

对自定义属性过滤

生产者可以在消息中添加自定义的属性:

msg.putUserProperty("prop1", "1");
msg.putUserProperty("prop2", "2");

消费者接收数据时,可以根据属性来过滤消息:

consumer.subscribe("Topic7", MessageSelector.bysql("prop1=1 or prop2=2"));

可以看到,自定义属性的过滤语法是 sql 语法,RocketMQ只定义了一些基本语法来支持这个特性,支持的 sql 过滤语法如下:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

生产者


消费者


事务消息

RocketMQ 提供了可靠性消息,也叫事务消息。下面分析一下其原理。

事务消息的原理

在这里插入图片描述


在这里插入图片描述


下面来看 RocketMQ 的事务消息是如何来发送“可靠消息”的,只需要以下三步:

  1. 发送半消息(半消息不会发送给消费者)
  2. 执行本地事务
  3. 提交消息

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述


完成事务消息发送后,消费者就可以以正常的方式来消费数据。

RocketMQ 的自动重发机制在绝大多数情况下,都可以保证消息被正确消费。

假如消息最终消费失败了,还可以由人工处理进行托底。

在这里插入图片描述


上面分析的是正常情况下的执行流程。下面再来看两种错误情况:

  1. 事务执行失败时回滚消息
  2. 服务器无法得知消息状态时,需要主动回查消息状态

回滚:

在这里插入图片描述


消息回查:

在这里插入图片描述


在这里插入图片描述

生产者


消费者


总结

以上是小编为你收集整理的RocketMQ (四) 使用RocketMQ原生API收发消息全部内容。

如果觉得小编网站内容还不错,欢迎将小编网站推荐给好友。

原文地址:https://blog.csdn.net/weixin_43887285/article/details/121037225

RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目

RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目

RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 介绍

由 Apache RocketMQ 社区贡献并维护的 Apache RocketMQ 扩展项目。

RocketMQ-Console

使用 Spring Boot 重新设计的 RocketMQ 控制台

RocketMQ-JMS

RocketMQ 的 JMS 1.1 规范实现

RocketMQ-Flume

Flume RocketMQ source 和 sink 的实现

RocketMQ-Flink

集成 Apache Flink 和 Apache RocketMQ。详细介绍请查看
README

RocketMQ-Spark

集成 Apache Spark 和 Apache RocketMQ,提供了 push & pull 消费者。详细介绍请查看
README

RocketMQ-Docker

Apache RocketMQ Docker 提供了 Dockerfile 和 bash 脚本用于构建和运行 Docker 镜像

RocketMQ-MysqL

该项目是 MysqL 和其他系统之间的数据复制器。详细介绍请查看 README

RocketMQ-CPP

稳定、广泛使用的 Apache RocketMQ C++ 客户端 SDK

其他

RocketMQ-Druid, RocketMQ-
Ignite

RocketMQ-Storm 的集成

RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 官网

https://github.com/apache/rocketmq-externals

RocketMQ 学习总结之一:RocketMQ 模块功能介绍

RocketMQ 学习总结之一:RocketMQ 模块功能介绍

模块介绍:

  • rocketmq-broker(服务端,接受消息,存储消息,consumer 拉取消息)
  • rocketmq-client(消息发送和接收,包含 consumer 和 producer)
  • rocketmq-common(通用的常量枚举、基类方法或者数据结构,按描述的目标来分包通俗易懂。包名有:admin,consumer,filter,hook,message 等。)
  • rocketmq-example(rocketmq 使用样例)
  • rocketmq-filtersrv(消息过滤器)
  • rocketmq-namesrv(NameServer,类似服务注册中心,broker 在这里注册,consumer 和 producer 在这里找到 broker 地址)
  • rocketmq-remoting(使用 netty 的客户端、服务端,使用 fastjson 序列化,自定义二进制协议)
  • rocketmq-srvutil(只有一个 ServerUtil 类,只提供 Server 程序依赖,尽可能减少客户端依赖)
  • rocketmq-store(消息存储,索引,consumerLog,commitLog 等)
  • rocketmq-tools(命令行工具)

 

    

今天关于RocketMQrocketmq工作原理的介绍到此结束,谢谢您的阅读,有关Apache RocketMQ RocketMQ是什么? 消息中间件、RocketMQ (四) 使用RocketMQ原生API收发消息、RocketMQ Externals RocketMQ-ConsoleRocketMQ-JMSRocketMQ-FlumeRocketMQ-FlinkRocketMQ-SparkRocketMQ-DockerRocketMQ-MySQLRocketMQ-CPP其他 Apache RocketMQ 的扩展项目、RocketMQ 学习总结之一:RocketMQ 模块功能介绍等更多相关知识的信息可以在本站进行查询。

本文标签: