GVKun编程网logo

聊聊 RPC 之 Consumer(rpc context)

18

对于想了解聊聊RPC之Consumer的读者,本文将是一篇不可错过的文章,我们将详细介绍rpccontext,并且为您提供关于.netcoreconsulgrpc--系统服务RPC实现通信(一)、co

对于想了解聊聊 RPC 之 Consumer的读者,本文将是一篇不可错过的文章,我们将详细介绍rpc context,并且为您提供关于.net core consul grpc--系统服务RPC实现通信(一)、com.alibaba.dubbo.rpc.RpcException: Forbid consume、Consumer Group Example-High Level Consumer (翻译)、FlinkKafkaConsumerBase 中指定 consumer 起始消费 offset 的方法解读的有价值信息。

本文目录一览:

聊聊 RPC 之 Consumer(rpc context)

聊聊 RPC 之 Consumer(rpc context)

在上一篇我们介绍了 RPC 的 Provider,包括它的发布过程和在设计一个 Provider 时需要考虑的问题,本文将介绍做一个调用方,在这个 Consumer 调用过程中,RPC 又帮我们做了哪些事情和在使用别人提供的 Provider 时需要注意哪些问题。(和上篇一样,我们仍以 Dubbo 为例)

Consumer 的调用过程

作为 Consumer 调用别人提供的一个服务,一般需要如下工作 <!--more-->

  1. 引入 client 包

           <dependency>
                <groupId>info.yywang.demo</groupId>
                <artifactId>hello-client</artifactId>
                <version>1.0.0-SNAPSHOT</version>
            </dependency>
    
  2. Spring 中引用远程服务

        <dubbo:registry address="zookeeper://192.168.0.122:2181" />
    
        <dubbo:reference id="helloService" interface="info.yywang.service.HelloService" />
    
  3. 调用远程服务

    public class HelloTest{
    
      @Resources
      private HelloService helloService;
    
      @Test
      public void testSay(){
        helloService.say(''yywang'');
      }
    }
    

通过以上代码,我们可以看出,只需要我们在 spring 配置中,增加一些配置,其他的就像我们使用本地接口一样,来使用远程接口。那么在这个背后,RPC 框架帮我们做了什么呢?(今天不在赘述过程,也不在针对 Dubbo 框架,我们从几个问题思考)

Consumer 端怎么找到 Provider 的地址

答:从注册中心获取到 Provider 的地址,并且根据路由规则,比如随机、轮训等方式,最后获取到一个将要方式的 Provider 地址,一般 Consumer 端会把 Provider 的地址缓存到本地,当请求来的时候,从本地获取。

Consumer 端怎么和 Provider 进行通信

答:Consumer 端和 Provider 的通信有两种方式,一种是同步调用,一种是异步调用。

  1. 同步调用

    一个工作线程在调用远程服务时,在得到结果之前,一直处于阻塞状态。在启动后,建立一个连接池,当一个请求来了之后,首先会经过对象序列化,然后从连接池里拿到一个连接,发送请求包,请求远程的服务,当远程服务响应结果之后,把连接放回连接池。然后工作线程继续执行以下代码

  2. 异步调用

    一个工作线程,在发起调用之后,在拿到结果之前,不会阻塞。在 Dubbo 的 dubbo 协议底层,使用 dubbo 协议就是基于 Netty 的底层通信模型。过程太长暂时不说了。

请求包里有哪些内容

答:请求包里有接口名,方法名字,参数类型等,参数值,请求 ID。

异步情况下,怎么把请求、响应对应起来

答:在远程调用时,首先会生成一个 ID 用于标记本次请求,并且把这个 ID 发送给服务端,同时使用这个 ID 作为 Key,将调用信息放在一个 map 里。在服务端响应的时候,把这个 ID 带过来,然后通过这个 ID 直接从 map 里找到对应的信息

Consumer 怎么知道 Provider 挂了

答:在 Consumer 端缓存了 Provider 的服务地址之后,Consumer 会和 Provider 保持心跳,同时在 Provider 挂掉之后,ConfigServer 也会通知 Consumer,Consumer 在本地的地址列表里清除该地址

Consumer 怎么知道 Provider 端增加了机器

答:同上 ConfigServer 会通知 Consumer 端增加服务地址

以上可以看出 ConfigServer 在这里扮演了很重要的角色,下篇文章我们将揭秘 ConfigServer 具体怎么实现通知的

Consumer 的开发

以上是对于原理的讲解,我们回到作为 RPC 的使用者来说,我们探讨一些几个问题

面向异常编程

在使用了 RPC 之后,我们不得不面对一个问题,就是网络通信。在网络通信过程中,就会有各种不确定的问题,所以在使用远程接口调用时,我们要考虑以下异常场景:

  1. 超时了怎么办

    超时一般有几种情况,一种是在调用时机器负载比较高,响应比较慢。第二种,方法本身存在性能问题。第二种,可能在调用的过程中,网络出现了抖动,可能调用方的逻辑已经执行完成了,但是由于网络原因,没有及时的响应。

    对于超时情况,通常的处理办法是重试,一旦选择了重试,就需要在实现具体的业务时,分析需不需要做幂等。需要注意对于写的操作,如果重复执行,可能出现两次写入相同的值。关于如何做幂等又是一个话题,大家可以思考

    对于方法本身的性能问题,首先可以通过提高超时时间来解决当前问题,同时需要催促服务提供方做性能改进。

  2. 调用返回了失败

    返回失败的可能性有几种,第一种是业务错误,第二种是程序错误。对于业务错误很简单,根据业务逻辑处理即可,但是当对于业务错误,需要做监控时,需要记录好用于监控的错误日志。对于程序错误,可能出现在几点,一是程序 bug,对于程序 bug 已经要记录好错误日志。第二是网络原因等错误。这种错误可以通过重试机制来保证。

错误日志,监控日志
  1. 错误日志,是排查线上问题时的最有利的武器,没有之一。线上处理问题,所以在调用远程服务的时候,一定需要记录好错误日志,但是日志也不是越多越好,日志过多也会代码排查问题的难度。并且在记录错误日志的同时,需要记录好关键的业务字段,比如订单编号、商品 Code 等等
  2. 监控日志,对于线上的出现的业务错误,我们通常需要通过监控来获取,有的时候监控日志和错误日志在一块,有的专门记录一个监控日志用于监控,个人推荐对于监控专门记录日志,用于监控。另外监控日志同样需要记录好关键的监控参数。
外部系统模型和本系统模型的适配

这个问题是在系统设计层面上的问题。就是在设计时,如何做到外部系统模型和本系统模型的适配。解决这个问题很简单,增加防腐层,就是增加一层防腐层,来将其他系统的模型适配到本系统模型,这样本系统的代码将不会充斥着其他系统模型的代码,并且在防腐层还能在提供方的系统发生变化时帮助本系统保持稳定。

如何测试你的代码

当你调用了远程接口之后,你想编写单元测试代码来测试自己的代码。如果你直接调用远程接口,有可能远程接口还调用了其他的远程接口。所以你需要准备满足这些系统的测试数据,这是非常麻烦的过程,通常这时就需要使用 mock 的方式来模拟远程接口。这样你只需要关心自己的逻辑即可。在集成测试时,在使用真实接口。

欢迎关注我的公众号 MyArtNote

MyArtNote

.net core consul grpc--系统服务RPC实现通信(一)

.net core consul grpc--系统服务RPC实现通信(一)

.net core grpc 系统服务实现通信(一)

现在系统都服务化,.net core 实现服务化的方式有很多,我们通过grpc实现客户端、服务端通信。

grpc(https://grpc.io/)是google发布的一个开源、高性能、通用RPC(Remote Procedure Call)框架,使用HTTP/2协议,支持多路复用,并用ProtoBuf作为序列化工具,提供跨语言、跨平台支持。下面以.net core演示如何使用grpc框架实现通信。

 

软件版本

.net core:2.0

grpc:1.11.0

 

项目结构

InstallGrpc .net framework类库 只为得到生成协议代码工具protoc.exe、grpc_csharp_plugin.exe,没有其他作用,如果已有工具,可以不用

Snai.GrpcClient 客户端 .net core 2.0控制台程序

Snai.GrpcService.Hosting 服务端宿主 .net core 2.0控制台程序

Snai.GrpcService.Impl 协议方法实现  .net standard 2.0类库

Snai.GrpcService.Protocol 生成协议方法 .net standard 2.0类库

运行结果

服务端

客户端

客户端调用服务端求和方法成功。

 

项目实现

一、服务端

新建Snai.GrpcService解决方案

1、编写协议

 新建 Snai.GrpcService.Protocol协议类库项目,在 依赖项 右击 管理NuGet程序包 浏览 找到 Grpc.Core 版本1.11.0,Google.Protobuf 版本3.5.1 包下载安装

 在项目根目录下新建一个 msg.proto 文件,打开 msg.proto 文件,在其中编写基于proto3语言的协议代码,用于自动生成到各语言协议,如果需要更深入的学习proto3语言可以打开该网站Proto3语言指南。msg.proto 代码如下

 定义当前使用的是proto3语言并且包名(生成为C#则为命名空间):

syntax = "proto3";

package Snai.GrpcService.Protocol;

定义了1个服务,且有1个方法:

service MsgService{
  rpc GetSum(GetMsgNumRequest) returns (GetMsgSumReply){}
}

 方法的接收参数和返回参数

复制代码
message GetMsgNumRequest {
  int32 Num1 = 1;
  int32 Num2 = 2;
}

message GetMsgSumReply {
  int32 Sum = 1;
}
复制代码

 2、将协议生成C#代码

生成协议代码需 protoc.exe、grpc_csharp_plugin.exe工具,在.net framework 项目下引用安装 Grpc.Tools 组件程序包,会得到protoc.exe、grpc_csharp_plugin.exe,但.net core 项目引用安装是不会下载工具到项目目录的,所以我们需要建一个.net framework项目,我建了个 InstallGrpc .net framework类库 用于引用安装得到工具。

这里得到工具有个小插曲,引用Grpc.Tools版本1.11.0得到protoc.exe、grpc_csharp_plugin.exe 拷到 Snai.GrpcService.Protocol 目录下生成不了,我再引用Google.Protobuf.Tools版本3.5.1里面有 protoc.exe,用 Grpc.Tools下的 grpc_csharp_plugin.exe, Google.Protobuf.Tools下protoc.exe 根据当前系统选择,拷贝到 Snai.GrpcService.Protocol 目录下。

先用Grpc.Tools 下的,如果生成不了,再用 Grpc.Tools下的 grpc_csharp_plugin.exe, Google.Protobuf.Tools下protoc.exe

然后在项目中新建一个名为ProtocGenerate.cmd的文件,在其中输入以下指令:

protoc -I . --csharp_out . --grpc_out . --plugin=protoc-gen-grpc=grpc_csharp_plugin.exe msg.proto

然后直接双击运行,项目下生成了“Msg.cs”和“MsgGrpc.cs”两个文件,这样协议部分的所有工作就完成了,最终项目结构如下:

 3、编写协议实现代码

 新建Snai.GrpcService.Impl实现类库项目,在 依赖项 下载安装Grpc.Core 包,项目引用Snai.GrpcService.Protocol

 在项目根目录下新建 MsgServiceImpl.cs 类文件,继承 MsgService.MsgServiceBase 协议类,实现服务方法,代码如下:

复制代码
using Grpc.Core;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Snai.GrpcService.Impl
{
    public class MsgServiceImpl: MsgService.MsgServiceBase
    {
        public MsgServiceImpl()
        {
        }

        public override async Task<GetMsgSumReply> GetSum(GetMsgNumRequest request, ServerCallContext context)
        {
            var result = new GetMsgSumReply();

            result.Sum = request.Num1 + request.Num2;

            return result;
        }
    }
}
复制代码

 在项目根目录下新建 RpcConfig.cs 类文件,编写绑定服务到服务端,服务端 地址 端口 等信息,实现启动方法,代码如下:

复制代码
using Grpc.Core;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcService.Impl
{
    public static class RpcConfig
    {
        private static Server _server;

        public static void Start()
        {
            _server = new Server
            {
                Services = { MsgService.BindService(new MsgServiceImpl()) },
                Ports = { new ServerPort("localhost", 40001, ServerCredentials.Insecure) }
            };
            _server.Start();

            Console.WriteLine("grpc ServerListening On Port 40001");
            Console.WriteLine("任意键退出...");
            Console.ReadKey();

            _server?.ShutdownAsync().Wait();
        }
    }
}
复制代码

 最终项目结构如下:

4、编写服务端启动程序

 新建Snai.GrpcService.Hosting 控制台程序,项目引用Snai.GrpcService.Impl

 打开 Program.cs 文件,修改 Main 方法,加入服务启动,代码如下:

复制代码
using Snai.GrpcService.Impl;
using System;

namespace Snai.GrpcService.Hosting
{
    class Program
    {
        static void Main(string[] args)
        {
            RpcConfig.Start();
        }
    }
}
复制代码

 最终项目结构如下:

到此服务端所有代码已编写完成,下面开始编写客户端。

二、客户端

 新建Snai.GrpcClient 控制台程序,在 依赖项 下载安装Grpc.Core 包,项目引用Snai.GrpcService.Protocol

 在项目根目录下新建 MsgServiceClient.cs 类文件,编写与服务端通信的 地址 端口 等信息,并调用服务端方法,代码如下:

复制代码
using Grpc.Core;
using Snai.GrpcService.Protocol;
using System;
using System.Collections.Generic;
using System.Text;

namespace Snai.GrpcClient
{
    public static class MsgServiceClient
    {
        private static Channel _channel;
        private static MsgService.MsgServiceClient _client;

        static MsgServiceClient()
        {
            _channel = new Channel("127.0.0.1:40001", ChannelCredentials.Insecure);
            _client = new MsgService.MsgServiceClient(_channel);
        }

        public static GetMsgSumReply GetSum(int num1, int num2)
        {
            return _client.GetSum(new GetMsgNumRequest
            {
                Num1 = num1,
                Num2 = num2
            });
        }
    }
}
复制代码

 打开 Program.cs 文件,修改 Main 方法,得到服务端返回结果,显示结果,代码如下:

复制代码
using Snai.GrpcService.Protocol;
using System;

namespace Snai.GrpcClient
{
    class Program
    {
        static void Main(string[] args)
        {
            GetMsgSumReply msgSum = MsgServiceClient.GetSum(10, 2);

            Console.WriteLine("grpc Client Call GetSum():" + msgSum.Sum);

            Console.WriteLine("任意键退出...");
            Console.ReadKey();
        }
    }
}
复制代码

 最终项目结构如下:

 

到此所有代码都已编写完成

三、启动

右击生成解决方案,生成完成后,先启动服务端,再启动客户端

命令行到服务端目录 Snai.GrpcService.Hosting\bin\Debug\netcoreapp2.0\,用命令 dotnet Snai.GrpcService.Hosting.dll 启动服务端

命令行到客户端目录 Snai.GrpcClient\bin\Debug\netcoreapp2.0\,用命令 dotnet Snai.GrpcClient.dll 启动客户端

 

客户端调用服务端方法成功,实现grpc

它们之间是通过Grpc.Core中的 Server 和 Channel 来通信

源码访问地址:https://github.com/Liu-Alan/Snai.GrpcService

com.alibaba.dubbo.rpc.RpcException: Forbid consume

com.alibaba.dubbo.rpc.RpcException: Forbid consume

com.alibaba.dubbo.rpc.RpcException: Forbid consumer 192.168.1.244 access service com.AA.SS.DD.SS.IUserService from registry 192.168.1.236:2181 use dubbo version 2.8.4, Please check registry access list (whitelist/blacklist).

Consumer Group Example-High Level Consumer (翻译)

Consumer Group Example-High Level Consumer (翻译)

Using the High Level Consumer

Why use the High Level Consumer

Sometimes the logic to read messages from Kafka doesn''t care about handling the message offsets, it just wants the data. So the High Level Consumer is provided to abstract most of the details of consuming events from Kafka.

有时候从 Kafka 读取 message 的逻辑并不关心维护 message offsets, 只是想要获取数据。所以 High Level Consumer 被提供作为从 Kafka 消费事件大部分细节的抽象。

First thing to know is that the High Level Consumer stores the last offset read from a specific partition in ZooKeeper. This offset is stored based on the name provided to Kafka when the process starts. This name is referred to as the Consumer Group.

首先需要知道的事情是 High Level Consumer 将从某个分区读取时的最后 offset 存储在 ZooKeeper 中。在程序启动时,这个 offset 基于 Kafka 提供的名字进行存储。这个名字被称为 Consumer Group。

The Consumer Group name is global across a Kafka cluster, so you should be careful that any ''old'' logic Consumers be shutdown before starting new code. When a new process is started with the same Consumer Group name, Kafka will add that processes'' threads to the set of threads available to consume the Topic and trigger a ''re-balance''. During this re-balance Kafka will assign available partitions to available threads, possibly moving a partition to another process. If you have a mixture of old and new business logic, it is possible that some messages go to the old logic.

Consumer Group 名字对于 Kafka 集群来说是全局,所以你应该注意在启动新的程序之前确认任何 '' 历史的 '' 逻辑 Consumers 被关闭。当一个新程序使用相同的 Consumer Group name,Kafka 将把这个程序的 thread 增加到消费这个 Topic 的线程集合中,并且触发一个 ''re-balance''. 这个 re-balance 中,Kafka 将指定可用的分区到可用的线程,可能移动一个分区到其他的 process。如果你有一个旧的和新的业务逻辑的组合,这可能的导致一些信息发送到旧逻辑中。

Designing a High Level Consumer

The first thing to know about using a High Level Consumer is that it can (and should!) be a multi-threaded application. The threading model revolves around the number of partitions in your topic and there are some very specific rules:

关于 High Level Consumer 首先需要知道的事情是,它可以 (并且应该) 是一个多线程程序。线程模型将围绕你的 topic 分区数量来进行工作,有一些具体的规则:

  • if you provide more threads than there are partitions on the topic, some threads will never see a message

    线程数大于 topic 分区数时,一些线程将无法接收信息

  • if you have more partitions than you have threads, some threads will receive data from multiple partitions
    线程数小于 topic 分区数时,一些线程将从多个分区中接收信息

  • if you have multiple partitions per thread there is NO guarantee about the order you receive messages, other than that within the partition the offsets will be sequential. For example, you may receive 5 messages from partition 10 and 6 from partition 11, then 5 more from partition 10 followed by 5 more from partition 10 even if partition 11 has data available.
    当每个线程接收多个分区消息时,则不能保证接收消息的顺序,而不是分区内的顺序,offset 将是连续的。例如,您可以从分区 10 收到 5 条信息,从分区 11 收到 6 条消息,然而可能出现从分区 10 接收 5 条信息之后又接收 5 条信息,即使分区 11 有可用的数据。

  • adding more processes/threads will cause Kafka to re-balance, possibly changing the assignment of a Partition to a Thread.

    增加更多的进程 / 线程将导致 Kafka re-balance,可能改变线程上的分区分配。

Next, your logic should expect to get an iterator from Kafka that may block if there are no new messages available.

其次,你的逻辑应该从 Kafka 中得到迭代器,并且果没有可用的新消息时进行阻塞。

Here is an example of a very simple consumer that expects to be threaded.

这是一个很简单的线程化消费的例子。

package com.test.groups;
 
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
 
public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;
 
    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }
 
    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

The interesting part here is the while (it.hasNext()) section. Basically this code reads from Kafka until you stop it.

有趣的部分是 while (it.hasNext()),这段代码会一直从 Kafka 中读取,直到你停止它。

Configuring the test application

Unlike the SimpleConsumer the High level consumer takes care of a lot of the bookkeeping and error handling for you. However you do need to tell Kafka where to store some information. The following method defines the basics for creating a High Level Consumer:

与 SimpleConsumer 不同,High level consumer 已经为你负责了一些 bookkeeping 和错误处理的工作。然而你需要告诉 Kafka 将这些信息存储在哪里。下面的方法定义了如何创建 High Level Consumer 的基础部分。

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

The ‘zookeeper.connect’ string identifies where to find once instance of Zookeeper in your cluster. Kafka uses ZooKeeper to store offsets of messages consumed for a specific topic and partition by this Consumer Group. 定义在你的集群冲找到 Zookeeper 的一个实例。Kafka 使用 ZooKeeper 来存储对于某个特定 topic 和 partition 通过 Consumer Group 进行消费的消息 offsets。

The ‘group.id’ string defines the Consumer Group this process is consuming on behalf of. 定义这个进程消费的 Consumer Group

The ‘zookeeper.session.timeout.ms’ is how many milliseconds Kafka will wait for ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages. 定义 Kafka 等待 ZooKeeper 响应的超时时间。

The ‘zookeeper.sync.time.ms’ is the number of milliseconds a ZooKeeper ‘follower’ can be behind the master before an error occurs.

The ‘auto.commit.interval.ms’ setting is how often updates to the consumed offsets are written to ZooKeeper. Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages. 定义在 ZooKeeper 中存储消费 offset 的更新时间间隔。这个 commit 间隔是基于时间而不是消息消费的数量,如果一个错误在两次更新中发生,当重新启动任务是,可能得到重复消息。

More information about these settings can be found here

Creating the thread pool

This example uses the Java java.util.concurrent package for thread management since it makes creating a thread pool very simple.

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
 
    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);
 
    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.execute(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}


First we create a Map that tells Kafka how many threads we are providing for which topics. The consumer.createMessageStreams is how we pass this information to Kafka. The return is a map of KafkaStream to listen on for each topic. (Note here we only asked Kafka for a single Topic but we could have asked for multiple by adding another element to the Map.)

首先我们创建一个 Map 并且告诉 Kafka 我们提供给哪些 topics 多少个线程。这个 consumer.createMessageStreams 就是我们如何将信息传递给 Kafka 的方法。返回的是一个用于监听每个 topic 的 KafkaStream 的 map。(备注,这里我们只是获取了一个 topic,但是我们可以通过在 Map 中增加元素来获取多个 topics)

Finally we create the thread pool and pass a new ConsumerTest object to each thread as our business logic.

最终我们创建一个线程池,并且在我们的业务逻辑中为每个线程设置一个 ConsumerTest 对象。

Clean Shutdown and Error Handling

Kafka does not update Zookeeper with the message offset last read after every read, instead it waits a short period of time. Due to this delay it is possible that your logic has consumed a message and that fact hasn''t been synced to zookeeper. So if your client exits/crashes you may find messages being replayed next time to start.

Kafka 不会每次 read 都更新 Zookeeper 的 message offset,而是每次等待一个短的周期。由于这个延迟,这可能导致你的逻辑已经消费了消息,然而事实上还没有同步到 ZooKeeper。所以当你的客户端退出或者崩溃时,你可能在下次启动时发现消息重播了。

Also note that sometimes the loss of a Broker or other event that causes the Leader for a Partition to change can also cause duplicate messages to be replayed.

而且需要记住,有时候一个 Broker 丢失或者其他导致分区 Leader 改变,也会导致重复消息重播。

To help avoid this, make sure you provide a clean way for your client to exit instead of assuming it can be ''kill -9''d.

为了避免这个问题,确认你提供了一个 clean 方法来退出客户端,而不是使用 "kill -9".

As an example, the main here sleeps for 10 seconds, which allows the background consumer threads to consume data from their streams 10 seconds. Since auto commit is on, they will commit offsets every second. Then, shutdown is called, which calls shutdown on the consumer, then on the ExecutorService, and finally tries to wait for the ExecutorService to finish all outsanding work. This gives the consumer threads time to finish processing the few outstanding messages that may remain in their streams. Shutting down the consumer causes the iterators for each stream to return false for hasNext() once all messages already received from the server are processed, so the other threads should exit gracefully. Additionally, with auto commit enabled, the call to consumer.shutdown() will commit the final offsets.

作为一个例子,main 方法在这里 sleep 10 秒,允许后台的消费者线程可以消费流数据 10 秒。由于自动提交,他们每秒都会 commit offsets。然后,shutdown 方法被调用,它将关闭 consumer,然后是关闭 ExecutorService,并最终等待 ExecutorService 完成所有主要工作。这给消费者线程时间来处理完一些可以保留在流中的 message。关闭消费者导致每个数据流 iterator 的 hasNext 方法一旦所有点的 message 已经接受并处理则返回 false,所以其他的线程应该优雅地关闭。此外,因为启动了自动提交,consumer.shutdown () 的调用将提交最终的 offset。

try {
    Thread.sleep(10000);
} catch (InterruptedException ie) {
 
}
example.shutdown();

In practice, a more common pattern is to use sleep indefinitely and use a shutdown hook to trigger clean shutdown.

实际上,更常见的模式是无限期地使用休眠,并使用关闭挂钩来触发优雅关闭。

Running the example

The example code expects the following command line parameters:

  • ZooKeeper connection string with port number
  • Consumer Group name to use for this process
  • Topic to consume messages from
  • # of threads to launch to consume the messages

For example:

server01.myco.com1:2181 group3 myTopic  4

Will connect to port 2181 on server01.myco.com for ZooKeeper and requests all partitions from Topic myTopic and consume them via 4 threads. The Consumer Group for this example is group3.

Full Source Code

package com.test.groups;
 
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
 
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class ConsumerGroupExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;
 
    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
 
    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
   }
 
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
 
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
 
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest"); // 从最早的数据开始消费
        // props.put("auto.offset.reset", "largest "); // 从最新的数据开始消费
 
        return new ConsumerConfig(props);
    }
 
    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);
 
        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
 
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
 
        }
        example.shutdown();
    }
}

 

FlinkKafkaConsumerBase 中指定 consumer 起始消费 offset 的方法解读

FlinkKafkaConsumerBase 中指定 consumer 起始消费 offset 的方法解读

总结

  1. setStartFromEarliest()
    • 从所有分区的最早的 offset 开始读取。
  2. setStartFromLatest()
    • 从所有分区的最近的 offset 开始读取。
  3. setStartFromTimestamp(long startupOffsetsTimestamp)。
    • 从指定的 timestamp 开始读取分区。
    • 指定的 timestamp 必须早于当前 timestamp。
    • consumer 将会从 Kafka 查找 timestamp >= 指定的 timestamp 的最早的 offset。
    • 如果没找到,consumer 将会从最近的 offset 读取数据。
  4. setStartFromGroupOffsets()。
    • 从 Zookeeper / Kafka brokers 中已提交的 offset 开始读取。
    • consumer 配置属性中必须配置 “group.id”。
    • 如果没有找到某个分区的 offset,该分区将会使用配置属性中的 "auto.offset.reset"。
  5. setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)。
    • 指定 consumer 从指定的 offset 开始读取分区。
    • 需要为每个分区单独设置。指定的 offset 应当是将从分区中读取的下一个记录的 offset。
    • 如果提供的 offset map 包含 consumer 不 subscribe 的条目,则该条目将会被忽略。
    • 如果 consumer subscribe 了一个在提供的 offset map 中不存在的分区,consumer 将会针对该分区使用默认的 offset(参见 {@linkFlinkKafka ConsumerBase#Set Start From GroupOffset ()})。如果一个分区指定的 offset 无效,且找不到该分区的默认的 offset,则该分区将会使用配置属性中的 "auto.offset.reset"。

详情

  1. FlinkKafkaConsumerBase.setStartFromEarliest()
	/**
	 * 指定consumer从所有分区的最早的offset开始读取。
	 * 此方法会使得consumer忽略Zookeeper / Kafka brokers中已提交的任何offset。
	 *
	 * 当consumer从checkpoint或savepoint中还原时,这个方法不起作用,此时,会使用被还原的state中的offset。
	 * 
	 * @return 允许链式方法调用的consumer对象
	 */
	/**
	 * Specifies the consumer to start reading from the earliest offset for all partitions.
	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
	 *
	 * <p>This method does not affect where partitions are read from when the consumer is restored
	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
	 * savepoint, only the offsets in the restored state will be used.
	 *
	 * @return The consumer object, to allow function chaining.
	 */
	public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
		this.startupMode = StartupMode.EARLIEST;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = null;
		return this;
	}
  1. FlinkKafkaConsumerBase.setStartFromLatest()
	/**
	 * 指定consumer从所有分区的最近的offset开始读取。
	 * 此方法会使得consumer忽略Zookeeper / Kafka brokers中已提交的任何offsets。
	 *
	 * 当consumer从checkpoint或savepoint中还原时,这个方法不起作用,此时,会使用被还原的state中的offset。
	 * 
	 * @return 允许链式方法调用的consumer对象
	 */
	/**
	 * Specifies the consumer to start reading from the latest offset for all partitions.
	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
	 *
	 * <p>This method does not affect where partitions are read from when the consumer is restored
	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
	 * savepoint, only the offsets in the restored state will be used.
	 *
	 * @return The consumer object, to allow function chaining.
	 */
	public FlinkKafkaConsumerBase<T> setStartFromLatest() {
		this.startupMode = StartupMode.LATEST;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = null;
		return this;
	}
  1. FlinkKafkaConsumerBase.setStartFromTimestamp(long startupOffsetsTimestamp)
	/**
	 * 指定consumer从指定的timestamp开始读取分区。
	 * 指定的timestamp必须早于当前timestamp。
	 * 此方法会使得consumer忽略Zookeeper / Kafka brokers中已提交的任何offset。
	 *
	 * consumer将会从Kafka查找timestamp >= 指定的timestamp 的最早的offset,如果没找到,consumer将会从最近的offset读取数据。
	 * 
	 * 当consumer从checkpoint或savepoint中还原时,这个方法不起作用,此时,会使用被还原的state中的offset。
	 *
	 * @param startupOffsetsTimestamp 起始的offset的timestamp,从纪元开始毫秒数。
	 * 
	 * @return 允许链式方法调用的consumer对象
	 */
	/**
	 * Specifies the consumer to start reading partitions from a specified timestamp.
	 * The specified timestamp must be before the current timestamp.
	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
	 *
	 * <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
	 * to the specific timestamp from Kafka. If there''s no such offset, the consumer will use the
	 * latest offset to read data from kafka.
	 *
	 * <p>This method does not affect where partitions are read from when the consumer is restored
	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
	 * savepoint, only the offsets in the restored state will be used.
	 *
	 * @param startupOffsetsTimestamp timestamp for the startup offsets, as milliseconds from epoch.
	 *
	 * @return The consumer object, to allow function chaining.
	 */
	// NOTE -
	// This method is implemented in the base class because this is where the startup logging and verifications live.
	// However, it is not publicly exposed since only newer Kafka versions support the functionality.
	// Version-specific subclasses which can expose the functionality should override and allow public access.
	protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
		checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");

		long currentTimestamp = System.currentTimeMillis();
		checkArgument(startupOffsetsTimestamp <= currentTimestamp,
			"Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);

		this.startupMode = StartupMode.TIMESTAMP;
		this.startupOffsetsTimestamp = startupOffsetsTimestamp;
		this.specificStartupOffsets = null;
		return this;
	}
  1. FlinkKafkaConsumerBase.setStartFromGroupOffsets()
	/**
	 * 指定consumer从Zookeeper / Kafka brokers中已提交的offset开始读取。consumer配置属性中必须配置“group.id”。如果没有找到某个分区的offset,该分区将会使用配置属性中
	 * 的"auto.offset.reset"
	 *
	 * 当consumer从checkpoint或savepoint中还原时,这个方法不起作用,此时,会使用被还原的state中的offset。
	 * 
	 * @return 允许链式方法调用的consumer对象
	 */
	/**
	 * Specifies the consumer to start reading from any committed group offsets found
	 * in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration
	 * properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
	 * set in the configuration properties will be used for the partition.
	 *
	 * <p>This method does not affect where partitions are read from when the consumer is restored
	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
	 * savepoint, only the offsets in the restored state will be used.
	 *
	 * @return The consumer object, to allow function chaining.
	 */
	public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
		this.startupMode = StartupMode.GROUP_OFFSETS;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = null;
		return this;
	}
  1. FlinkKafkaConsumerBase.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
	/**
	 * 指定consumer从指定的offset开始读取分区。需要为每个分区单独设置。
	 * 指定的offset应当是将从分区中读取的下一个记录的offset。
	 * 此方法会使得consumer忽略Zookeeper / Kafka brokers中已提交的任何offset。
	 *
	 * 如果提供的offset map包含consumer不subscribe的条目,则该条目将会被忽略。如果consumer subscribe了一个在提供的offset map中不存在的分区,consumer将会
	 * 针对该分区使用默认的偏移量(参见{@linkFlinkKafka ConsumerBase#Set Start From GroupOffset()})。
	 *
	 * 如果一个分区指定的offset无效,且找不到该分区的默认offset,则该分区将会使用配置属性中的"auto.offset.reset"。
	 *
	 * 当consumer从checkpoint或savepoint中还原时,这个方法不起作用,此时,会使用被还原的state中的offset。
	 *
	 * @return 允许链式方法调用的consumer对象
	 */
	/**
	 * Specifies the consumer to start reading partitions from specific offsets, set independently for each partition.
	 * The specified offset should be the offset of the next record that will be read from partitions.
	 * This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
	 *
	 * <p>If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the
	 * consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided
	 * map of offsets, the consumer will fallback to the default group offset behaviour (see
	 * {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
	 *
	 * <p>If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group
	 * offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the
	 * configuration properties will be used for the partition
	 *
	 * <p>This method does not affect where partitions are read from when the consumer is restored
	 * from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
	 * savepoint, only the offsets in the restored state will be used.
	 *
	 * @return The consumer object, to allow function chaining.
	 */
	public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
		this.startupMode = StartupMode.SPECIFIC_OFFSETS;
		this.startupOffsetsTimestamp = null;
		this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
		return this;
	}

关于聊聊 RPC 之 Consumerrpc context的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于.net core consul grpc--系统服务RPC实现通信(一)、com.alibaba.dubbo.rpc.RpcException: Forbid consume、Consumer Group Example-High Level Consumer (翻译)、FlinkKafkaConsumerBase 中指定 consumer 起始消费 offset 的方法解读等相关知识的信息别忘了在本站进行查找喔。

本文标签: