GVKun编程网logo

如何使用 Apache Flume 发送日志数据至 Apache Pulsar(flume收集日志到hdfs)

9

在本文中,我们将详细介绍如何使用ApacheFlume发送日志数据至ApachePulsar的各个方面,并为您提供关于flume收集日志到hdfs的相关解答,同时,我们也将为您带来关于ApacheFl

在本文中,我们将详细介绍如何使用 Apache Flume 发送日志数据至 Apache Pulsar的各个方面,并为您提供关于flume收集日志到hdfs的相关解答,同时,我们也将为您带来关于Apache Flink 和 Apache Pulsar 的批流融合、Apache IoTDB x Apache Pulsar Meetup、Apache Pulsar 在智联招聘的实践 -- 从消息队列到基于 Apache Pulsar 的事件中心、Apache Pulsar 正式引入 Cloud Storage Sink 连接器:实现 Apache Pulsar 数据上云的有用知识。

本文目录一览:

如何使用 Apache Flume 发送日志数据至 Apache Pulsar(flume收集日志到hdfs)

如何使用 Apache Flume 发送日志数据至 Apache Pulsar(flume收集日志到hdfs)


阅读本文需要约 7 分钟。


Apache Flume 是一个分布式的、可靠易用的系统,可以有效地收集和汇总来自多种源系统的大量日志数据,或转移这些数据至一个数据中心存储。


Apache Pulsar 是 Yahoo 基于 Apache BookKeeper 开发和开源的下一代分布式消息系统。Apache Pulsar 已经从下一代分布式消息系统演化成为一个流原生数据平台。


本文主要介绍使用 Flume 实现日志搜集,并发送日志数据至 Pulsar 进行消费。整体架构如下: 



本文主要涉及 Flume 和 Pulsar 服务的搭建、测试、数据的发送及消费,全面讲解了 Flume 如何发送数据到 Pulsar 以及 Pulsar 如何消费数据。上图虽然画了 3 个 Flume 服务,但是本次测试只使用了一个,目的是快速走通整个流程,消费数据时也是只启动了一个消费端。后期如果有需求,可以根据此次试验快速完成扩展。


安装环境依赖


在搭建本次试验之前,需要安装以下依赖,本次试验是在 Mac 系统上进行的测试。


  • Docker

    https://www.docker.com/get-started  

  • Java 8  https://www.oracle.com/technetwork/java/javase/downloads/index.html  

  • Maven 3.5 或更高版本  https://archive.apache.org/dist/maven/maven-3/ 

  • Git  https://www.linode.com/docs/development/version-control/how-to-install-git-on-linux-mac-and-windows/    

  • Telnet    

    http://www.telnet.org/htm/howto.htm  


编译并打包项目


你能通过以下任一方式编译并打包项目:


方法 1:在本地编译并打包本项目


注意

在打包本项目之前,需要安装并配置好 Java 、 Maven 和 Git 的相关环境。


1、从 GitHub 克隆 Flume sink 代码。


git clone https://github.com/streamnative/pulsar-flume-ng-sink.git


2、使用 Maven 将本项目打包。


1  cd pulsar-flume-ng-sink
2  mvn clean package


如果出现以下信息,则说明打包成功:


1  [INFO] ------------------------------------------------------------------------
2  [INFO] BUILD SUCCESS
3  [INFO] ------------------------------------------------------------------------


方法 2:使用 Docker 镜像编译

并打包本项目


如果本地不方便安装 Java 和 Maven 环境,可以使用 Docker 环境进行打包。


1、拉取并启动镜像。

本次使用的 Docker 镜像是 maven:3.6-jdk-8 ,带有 Maven 、 Java 和 Git 的相关环境。


1  docker pull maven:3.6-jdk-8
2  docker run -d -it --name docker-package maven:3.6-jdk-8 /bin/bash


上述命令会启动一个基于镜像 maven:3.6-jdk-8 名称为 docker-package 的 Docker 服务。


1  docker ps -a | grep docker-package
2  580376b38aa8        maven:3.6-jdk-8                              "/usr/local/bin/mvn-…"   5 minutes ago       Up 5 minutes                                    docker-package
3  docker exec -it docker-package /bin/bash


使用上述命令确认该镜像是否成功启动。成功启动后,我们就已经在 Docker 服务中了。


2、克隆代码并打包。


成功启动该镜像后,可以非常方便地进行打包。类似地,我们可以使用与在本地打包相同的方法,在 Docker 中对本项目进行打包。


1  git clone https://github.com/streamnative/pulsar-flume-ng-sink.git
2  cd pulsar-flume-ng-sink/
3  mvn clean package


如果出现以下信息,则说明打包成功:


1  [INFO] ------------------------------------------------------------------------
2  [INFO] BUILD SUCCESS
3  [INFO] ------------------------------------------------------------------------


打包成功后,在本目录下的 target 文件夹中会出现一个 jar 包,名字类似于  flume-ng-pulsar-sink-<version>.jar。


安装 Pulsar


为了方便测试,本次安装 standalone Pulsar,并且使用 Docker 镜像 apachepulsar/pulsar:2.3.0 启动服务。


1docker pull apachepulsar/pulsar:2.3.0
2docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-flume-standalone apachepulsar/pulsar:2.3.0 bin/pulsar standalone


该服务暴露 6650 和 8080 端口供外部服务使用,该容器将 Pulsar 服务的 data 目录挂载在了当前目录下,并且命名为 pulsar-flume-standalone。 -d 参数表示使该服务使用后台模式运行;-it 参数表示以交互模式运行容器,并为容器分配一个伪输入终端。


1docker logs -f pulsar-flume-standalone
2
307:53:43.844 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
407:53:43.856 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Created namespace public/default
507:53:43.859 [pulsar-web-55-6] INFO  org.eclipse.jetty.server.RequestLog - 172.17.0.3 - - [28/Apr/2019:07:53:43 +0000"PUT /admin/v2/namespaces/public/default HTTP/1.1" 204 0 "-" "Jersey/2.27 (HttpUrlConnection 1.8.0_181)" 17
607:53:43.864 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.web.PulsarWebResource - Successfully validated clusters on tenant [public]
707:53:43.868 [pulsar-ordered-OrderedExecutor-1-0-EventThread] INFO  org.apache.pulsar.zookeeper.ZooKeeperDataCache - [State:CONNECTED Timeout:30000 sessionid:0x1000112d2ea000a local:/127.0.0.1:38464 remoteserver:localhost/127.0.0.1:2181 lastZxid:154 xid:41 sent:41 recv:43 queuedpkts:0 pendingresp:0 queuedevents:0] Received ZooKeeper watch event: WatchedEvent state:SyncConnected type:NodeDataChanged path:/admin/policies/public/default
807:53:43.869 [pulsar-web-55-6] INFO  org.apache.pulsar.broker.admin.impl.NamespacesBase - [null] Successfully updated the replication clusters on namespace public/default


使用 docker logs -f pulsar-flume-standalone 命令查看 Docker 日志。如果出现以上的日志信息,则说明 Pulsar 在单机模式下启动成功了。


使用 Flume


使用 Docker 搭建 Flume 环境。


1、启动 Flume 容器


1 docker pull maven:3.6-jdk-8
2 docker run -d -it --link pulsar-flume-standalone -p 44445:44445 --name flume maven:3.6-jdk-8 /bin/bash


以上命令会启动一个名称为 flume 的服务,它链接到 pulsar-flume-standalone 容器,暴露 44445 端口的 Flume 服务。


1  docker ps -a | grep flume
2  27c7555e5481        maven:3.6-jdk-8                              "/usr/local/bin/mvn-…"   46 seconds ago      Up 45 seconds               0.0.0.0:44445->44445/tcp                         flume


使用 docker ps 命令可以看到启动的 Flume 服务。


2、安装 Flume 服务


(1)进入 Flume 容器。


docker exec -it flume /bin/bash


(2)下载并解压 Apache Flume 压缩包到当前目录。


1  wget http://apache.01link.hk/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
2  tar -zxvf apache-flume-1.9.0-bin.tar.gz


(3)退出 Flume 容器。


3、部署 pulsar-flume-ng-sink


(1)修改配置文件。本次测试需使用以下 2 个配置文件:


配置文件 flume-example.conf (https://github.com/streamnative/pulsar-flume-ng-sink/blob/master/src/test/resources/flume-example.conf


flume-example.conf 中兼容 Flume 的配置。更多关于 Flume 配置的信息,参阅 Flume 官方文档,此处简单介绍以下信息:


 1    # Describe/configure the source
2    a1.sources.r1.type = netcat
3    a1.sources.r1.bind = 0.0.0.0
4    a1.sources.r1.port = 44445
5
6    ## Describe the sink
7    a1.sinks.k1.type = org.apache.flume.sink.pulsar.PulsarSink
8    # Configure the pulsar service url (without `pulsar://`)
9    a1.sinks.k1.serviceUrl = pulsar-flume-standalone:6650
10    a1.sinks.k1.topicName = flume-test-topic
11    a1.sinks.k1.producerName = flume-test-producer


参数说明:

  • a1.sources.r1.type    

    指定数据来源。为了方便测试,此处指定 netcat 可以使用 Telnet 工具进行测试。   

  • a1.sources.r1.bind    

    绑定本机的 IP 地址。   

  • a1.sources.r1.port    

    绑定本机的 44445 端口。启动容器会映射该端口映射,方便测试。   

  • a1.sinks.k1.type    

    指定使用的 PulsarSink 类。这个不需要更改。   

  • a1.sinks.k1.serviceUrl    

    指定 Pulsar 服务的地址。你可以根据实际需求,更改该参数。此处使用容器地址是 pulsar-flume-standalone:6650。如需支持 SSL,可以配置打开 useTLS 选项。   

  • a1.sinks.k1.topicName    

    指定需要发送至 Pulsar 的目标 Topic。如果 Pulsar 服务没有该 Topic ,则会自动新建该 Topic。   

  • a1.sinks.k1.producerName    

    指定 sink 的名称,不能重复。 


以上是一些比较简单的配置,根据这些配置,我们可以完成本次试验。更多关于生产环境参数的置、客户端配置和 Producer 配置等信息,参阅 pulsar-flume-ng-sink 的配置 (https://github.com/streamnative/pulsar-flume-ng-sink#configurations)。


配置文件 flume-env.sh (https://github.com/streamnative/pulsar-flume-ng-sink/blob/master/src/test/resources/flume-env.sh


flume-env.sh 文件是 Flume 在启动的时需要使用的一些库的查找路径,需要将已打包好的 flume-ng-pulsar-sink-<version>.jar 的 jar 包复制到该路径下。

以下是本次测使用的 flume-env.sh 配置:


1      #!/bin/sh
2      export JAVA_HOME=/docker-java-home
3
4      FLUME_CLASSPATH=/docker-java-home/lib


(2)复制配置文件至 Flume 服务器。


按照以上步骤修改完配置文件后,可以将这些配置复制到 Flume 服务所在的容器。


1  docker cp flume-example.conf flume:/apache-flume-1.9.0-bin/conf/
2  docker cp flume-env.sh flume:/apache-flume-1.9.0-bin/conf/


(3)复制 flume-ng-pulsar-sink-.jar 至 Flume 容器。


使用以下任一方法:


方法 1


如果在本地进行了打包,使用以下命令可以将该 jar 包复制到 Flume 容器。进入打包文件的根目录,使用 Docker 命令进行复制。


1    cd pulsar-flume-ng-sink
2    docker cp target/flume-ng-pulsar-sink-1.9.0.jar flume:apache-flume-1.9.0-bin/lib/



方法 2


如果在容器中进行了打包,首先将容器中已打包的 jar 包复制到本地,再使用 Docker 命令复制到 Flume 容器中。


1    docker cp docker-package:/pulsar-flume-ng-sink/target/flume-ng-pulsar-sink-1.9.0.jar .
2    docker cp flume-ng-pulsar-sink-1.9.0.jar flume:apache-flume-1.9.0-bin/lib/


至此,已成功构建启动 Flume 服务所需要的环境和配置文件。


4、启动 Flume 服务


进入 Flume 容器并启动 flume agent。


1  docker exec -it flume /bin/bash
2  apache-flume-1.9.0-bin/bin/flume-ng agent --conf apache-flume-1.9.0-bin/conf/ -f apache-flume-1.9.0-bin/conf/flume-example.conf -n a1


在以上命令中,--conf 指定 Flume 服务的配置文件夹,-f 指定配置文件,-n 指定名称(需要与配置文件里的相同)。


如果出现如下信息,则说明 flume-ng agent 启动成功。


1  Info: Sourcing environment configuration script /apache-flume-1.9.0-bin/conf/flume-env.sh
2  Info: Including Hive libraries found via () for Hive access
3  + exec /docker-java-home/bin/java -Xmx20m -cp ''/apache-flume-1.9.0-bin/conf:/apache-flume-1.9.0-bin/lib/*:/docker-java-home/lib:/lib/*'' -Djava.library.path= org.apache.flume.node.Application -f apache-flume-1.9.0-bin/conf/flume-example.conf -n a1


5、启动消费端


在新的窗口中进入 Pulsar 的容器,再启动消费端(用于消费 Flume 发出的信息)。

测试使用的脚本文件在这里(https://github.com/streamnative/pulsar-flume-ng-sink/blob/master/src/test/python/pulsar-flume.py)。脚本文件内容如下:


 1  import pulsar
2
3  client = pulsar.Client(''pulsar://localhost:6650'')
4  consumer = client.subscribe(''flume-test-topic'',
5                              subscription_name=''sub'')
6
7  while True:
8      msg = consumer.receive()
9      print("Received message: ''%s''" % msg.data())
10      consumer.acknowledge(msg)
11
12  client.close()


首先,该脚本会连接到本地 Pulsar 服务的 6650 端口。由于在 Pulsar 服务所在的容器中启动的该脚本,因此,服务地址是 localhost:6650。该脚本会在 flume-test-topic Topic 上进行消费,该 Topic 正是我们在 flume-example.conf 中配置的 Topic ,并且指定订阅名称为 sub。


(1)复制测试脚本文件到 Pulsar 服务所在的容器。


1    cd pulsar-flume-ng-sink
2    docker cp src/test/python/pulsar-flume.py pulsar-flume-standalone:/pulsar/


(2)进入 Pulsar 服务容器。


    docker exec -it pulsar-flume-standalone /bin/bash


(3)启动消费者脚本。


1      python pulsar-flume.py
2      2019-04-28 09:01:43.581 INFO  Client:88 | Subscribing on Topic :flume-test-topic
3      2019-04-28 09:01:43.582 INFO  ConnectionPool:72 | Created connection for pulsar://localhost:6650
4      2019-04-28 09:01:43.585 INFO  ClientConnection:300 | [127.0.0.1:56088 -> 127.0.0.1:6650] Connected to broker
5      2019-04-28 09:01:43.590 INFO  HandlerBase:52 | [persistent://public/default/flume-test-topic, sub, 0] Getting connection from pool
6      2019-04-28 09:01:43.590 INFO  ConnectionPool:72 | Created connection for pulsar://c063d9580d4c:6650
7      2019-04-28 09:01:43.591 INFO  ClientConnection:302 | [127.0.0.1:56090 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://c063d9580d4c:6650
8      2019-04-28 09:01:43.614 INFO  ConsumerImpl:169 | [persistent://public/default/flume-test-topic, sub, 0] Created consumer on broker [127.0.0.1:56090 -> 127.0.0.1:6650]


启动脚本后出现以上信息,则说明消费端已经成功启动。


6、发送消息进行测试


如前文所述,本次测试使用 Telnet 工具。


开启一个新的窗口,使用以下 Telnet 命令,发送数据到 Flume 的 44445 端口。


1    ~ telnet localhost 44445
2  Trying ::1...
3  Connected to localhost.
4  Escape character is ''^]''.
5  hello
6  OK
7  world
8  OK


此时,在刚才开启的消费端可以看到通过 Telnet 发送的信息,内容大致如下:


1  ''eceived message: ''hello
2  ''eceived message: ''world


以上正是通过 Telnet 发送到 Flume 44445 端口的数据。


清理环境


完成本次测试后,使用以下命令停止并且删除相关容器:


1docker stop flume
2docker rm flume
3docker stop pulsar-flume-standalone
4docker rm pulsar-flume-standalone
5docker stop docker-package
6docker rm docker-package


更多信息


Apache Pulsar Connector 系列文章:

  • Pulsar Connector 预览篇

  • Pulsar Sink 入门指南

  • Pulsar Source 入门篇

  • 使用 Elastic Beats 搜集日志到 Pulsar



审校 | Anonymitaet

编辑 | Irene


更多关于 Pulsar 的技术干货和产品动态,请关注 StreamNative 微信公众号。


本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与 “OSC 源创计划”,欢迎正在阅读的你也加入,一起分享。

Apache Flink 和 Apache Pulsar 的批流融合

本文由郭斯杰老师在 Flink Forward San Francisco 2019 大会上的演讲内容整理而成。Apache Flink(以下简称 Flink)和 Apache Pulsar(以下简称Pulsar)的开源数据技术框架能够以不同的方式融合来提供大规模弹性数据处理,本文将详细介绍 Flink 和 Pulsar 在批流应用程序的融合情况。


本文目录:


1. Apache Pulsar 简介

2. Pulsar 数据视图:分片数据流

3. Flink + Pulsar 的融合

3.1 未来融合方式

3.2 现有融合方式

4. 总结


1.Apache Pulsar 简介


Apache Pulsar 是一个开源的分布式发布-订阅消息系统, 由 Apache 软件基金会管理,并于 2018 年 9 月成为 Apache顶级开源项目。 Pulsar 是一种多租户、高性能解决方案,用于服务器到服务器消息传递,包括多个功能,例如,在一个 Pulsar 实例中对多个集群提供原生支持、集群间消息跨地域的无缝复制、发布和端到端的低延迟、超过一百万个主题的无缝扩展以及由 Apache BookKeeper 提供的持久消息存储保证消息传递。现在我们来讨论Pulsar 和其他发布-订阅消息传递框架之间的主要区别:


  • 第一个区别是,虽然 Pulsar 提供了灵活的发布-订阅消息传递系统,但它也由持久的日志存储支持——因此需在一个框架下集成消息传递和存储功能。由于 Pulsar 采用了分层架构,它可以即时故障恢复、支持独立可扩展性和无需均衡的集群扩展。Pulsar 的架构与其他发布-订阅系统类似,框架由主题组成,而主题是主要数据实体。如下图所示,生产者向主题发送数据,消费者从主题接收数据。



  • 第二个区别是,Pulsar 的框架构建从一开始就考虑到了多租户。这意味着每个 Pulsar 主题都有一个分层的管理结构,使得资源分配、资源管理和团队协作变得高效而容易。由于 Pulsar 提供属性(租户)级、命名空间级和主题级的资源隔离,Pulsar 的多租户特性不仅能使数据平台管理人员轻松扩展新的团队,还能跨集群共享数据,简化团队协作。



  • 最后,Pulsar 灵活的消息传递框架统一了流式和队列数据消费模型,并提供了更大的灵活性。如下图所示,Pulsar 保存主题中的数据,而多个团队可以根据其工作负载和数据消费模式独立地消费数据。



2.Pulsar 数据视图:分片数据流


Apache Flink 是一个流式优先计算框架,它将批处理视为流处理的特殊情况。在对数据流的看法上,Flink 区分了有界和无界数据流之间的批处理和流处理,并假设对于批处理工作负载数据流是有限的,具有开始和结束。


在数据层上,Apache Pulsar 与 Apache Flink 的观点相似。该框架也使用流作为所有数据的统一视图,分层架构允许传统发布-订阅消息传递,用于流式工作负载和连续数据处理;并支持分片流(Segmented Streams)和有界数据流的使用,用于批处理和静态工作负载。



如下图所示,为了并行处理数据,生产者向主题发送数据后,Pulsar 根据数据流量对主题进行分区,再在每个分区中进行分片,并使用 Apache BookKeeper 进行分片存储。这一模式允许在同一个框架中集成传统的发布-订阅消息系统和分布式并行计算。



3.Flink + Pulsar 的融合


Apache Flink 和 Apache Pulsar 已经以多种方式融合。在以下内容中,我会介绍两个框架间未来一些可行的融合方式,并分享一些融合使用两个框架的示例。


3.1 未来融合方式


Pulsar 能以不同的方式与 Apache Flink 融合,一些可行的融合包括,使用流式连接器(Streaming Connectors)支持流式工作负载,或使用批式源连接器(Batch Source Connectors)支持批式工作负载。Pulsar 还提供了对 Schema 的原生支持,可以与 Flink 集成并提供对数据的结构化访问,例如,使用 Flink SQL 在 Pulsar 中查询数据。另外,还能将 Pulsar 作为 Flink 的状态后端。由于 Pulsar 具有分层架构(Apache Bookkeeper 支持下的 Streams 和 Segmented Streams),因此可以将 Pulsar 作为存储层并存储 Flink 状态。

 

从架构的角度来看,我们可以想象两个框架之间的融合,使用 Apache Pulsar 作为统一的数据层视图,使用 Apache Flink 作为统一的计算、数据处理框架和 API。


3.2 现有融合方式


两个框架之间的融合正在进行中,开发人员已经可以通过多种方式融合使用 Pulsar 和 Flink。例如,在 Flink DataStream 应用程序中,Pulsar 可以作为流数据源和流接收器。开发人员能使 Flink 作业从 Pulsar 中获取数据,再进行计算并处理实时数据,最后将数据作为流接收器发送回 Pulsar 主题。示例如下:


PulsarSourceBuilder<String>builder = PulsarSourceBuilder.builder(new SimpleStringSchema()) .serviceUrl(serviceUrl) .topic(inputTopic) .subscriptionName(subscription);SourceFunction<String> src = builder.build();DataStream<String> input = env.addSource(src); DataStream<WordWithCount> wc = input .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> { for (String word : line.split("\\s")) { collector.collect(new WordWithCount(word, 1)); } }) .returns(WordWithCount.class) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce((ReduceFunction<WordWithCount>) (c1, c2) -> new WordWithCount(c1.word, c1.count + c2.count)); if (null != outputTopic) { wc.addSink(new FlinkPulsarProducer<>( serviceUrl, outputTopic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes(UTF_8), wordWithCount -> wordWithCount.word )).setParallelism(parallelism);} else { // print the results with a single thread, rather than in parallel wc.print().setParallelism(1);}


另一个开发人员可利用的框架间的融合,已经包括将 Pulsar 用作 Flink 应用程序的流式源和流式表接收器,代码示例如下:


PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema()) .serviceUrl(serviceUrl) .topic(inputTopic) .subscriptionName(subscription);SourceFunction<String> src = builder.build();DataStream<String> input = env.addSource(src); DataStream<WordWithCount> wc = input .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> { for (String word : line.split("\\s")) { collector.collect( new WordWithCount(word, 1) ); } }) .returns(WordWithCount.class) .keyBy(ROUTING_KEY) .timeWindow(Time.seconds(5)) .reduce((ReduceFunction<WordWithCount>) (c1, c2) -> new WordWithCount(c1.word, c1.count + c2.count)); tableEnvironment.registerDataStream("wc",wc);Table table = tableEnvironment.sqlQuery("select word, `count` from wc");table.printSchema();TableSink sink = null;if (null != outputTopic) { sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);} else { // print the results with a csv file sink = new CsvTableSink("./examples/file", "|");}table.writeToSink(sink);


最后,Flink 融合 Pulsar 作为批处理接收器,负责完成批处理工作负载。Flink 在静态数据集完成计算之后,批处理接收器将结果发送至 Pulsar。示例如下:


// create PulsarOutputFormat instancefinal OutputFormat pulsarOutputFormat = new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes()); // create DataSetDataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE); // convert sentences to wordstextDS.flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] words = value.toLowerCase().split(" "); for(String word: words) { out.collect(new WordWithCount(word.replace(".", ""), 1)); } }}) // filter words which length is bigger than 4.filter(wordWithCount -> wordWithCount.word.length() > 4) // group the words.groupBy(new KeySelector<WordWithCount, String>() { @Override public String getKey(WordWithCount wordWithCount) throws Exception { return wordWithCount.word; }}) // sum the word counts.reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception { return new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count); }}) // write batch data to Pulsar.output(pulsarOutputFormat);


4.总结


Pulsar 和 Flink 对应用程序在数据和计算级别如何处理数据的视图基本一致,将“批”作为“流”的特殊情况进行“流式优先”处理。通过 Pulsar 的 Segmented Streams 方法和 Flink 在一个框架下统一批处理和流处理工作负载的几个步骤,可以应用多种方法融合两种技术,提供大规模的弹性数据处理。欢迎订阅 Apache Flink 和 Apache Pulsar 邮件,及时了解领域最新发展,或在社区分享您的想法和建议。




▼ 社区福利 ▼ 


推荐 4 个 Apache 顶级项目的官方公众号,希望对大家了解开源大数据生态,扩展技术视野有所帮助。


  • Apache Pulsar是下一代云原生流数据平台,助力企业快速分析实时数据,激活数据价值,实现 C 位出道。这里是 Pulsar 前沿技术的传播圣地,也是技术爱好者、开发者和终极用户时刻关注的技术平台。我们定时分享 Pulsar 优质内容,包括社区活动、技术文章、用户案例、行业动态和热点话题等,让你全面拥抱 Pulsar 的一手讯息。Apache Pulsar,助力千万企业和技术人开疆拓土、共同成长。

 Apache Pulsar  


  • Apache HBase 技术社区,研究探讨 HBase 内核原理,源码剖析,周边生态以及实践应用,汇集众多 Apache HBase PMC&Committer 以及爱好使用者,提供一线 Apache HBase 企业实战资料以及 Flink 集成等资讯。

 HBase 技术社区 ▼ 


  • Apache Kylin,介绍 Kylin 的功能特性、应用案例、经验分享、社区资讯、活动等。开源大数据分布式 OLAP 引擎 Apache Kylin 于 2014 年开源,在 2015 年和 2016 年连续获得 InfoWorld 的 BOSSIE 奖:年度最佳开源大数据工具奖,发展至今在全球已经拥有超过 1000 家企业用户。作为首个被 Apache 软件基金会认证的由中国人主导的顶级开源项目,Kylin 为万亿数据提供亚秒级查询,并可以和现有的 Hadoop / Spark 及 BI 无缝集成。

 Apache Kylin ▼ 


  • Ververica,由 Apache Flink Community China 运营管理,旨在联合国内的 Flink 大 V,向国内宣传和普及 Flink 相关的技术。公众号将持续输出 Flink 最新社区动态,入门教程、Meetup 资讯、应用案例以及源码解析等内容,希望联合社区同学一起推动国内大数据技术发展。

 Ververica ▼ 


/ Apache Flink 极客挑战赛,10 万奖金等你拿 /


你也「在看」吗

本文分享自微信公众号 - Flink 中文社区(gh_5efd76d10a8d)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

Apache IoTDB x Apache Pulsar Meetup

Apache IoTDB x Apache Pulsar Meetup

活动介绍

Apache Pulsar 是下一代云原生分布式流数据平台,它源于 Yahoo,2016 年 12 月开源,2018 年 9 月正式成为 Apache 顶级项目,逐渐从单一的消息系统演化成集消息、存储和函数式轻量化计算的流数据平台。

从成为 Apache 顶级项目后,在这一年的时间中,Pulsar 发展势头非常迅速,目前在全球拥有 100+ 的企业级用户,像雅虎、苹果、迪斯尼、Hulu、腾讯、中国移动、中国电信、智联招聘、涂鸦智能、个推等公司都在使用 Pulsar。Pulsar 社区目前有 190+ contributors,30+ committers,代码提交次数 4000+,fork 人数 1100+,slack 社群和微信社群活跃用户分别为 1000+,在海内外举办过 10 多场大型线下技术交流会。在 Pulsar 如火如荼的发展过程中,Pulsar 积极与周边大数据生态互联,目前已经完成 Pulsar + Flink,Pulsar + Spark 等的集成应用。Apache Pulsar 的关注度与日倍增,正在发展成为最受欢迎的流数据平台。

这次是 Apache Pulsar 第一次走进清华大学校园,与国内顶级开源数据库 Apache IoTDB 进行密切合作,希望与更多高校学子进行深入交流,碰撞出不一样的火花。

 

活动议程

WX20191206-200917@2x.png

 

️演讲内容

1. Apache Pulsar -- 从消息系统到流原生平台

WX20191206-201030@2x.png

摘要:


本次分享主要介绍 Apache Pulsar 的特性,从技术和社区多个角度讲解:

1. Apache Pulsar 是什么

2. Apache Pulsar 的差异化亮点

3. Apache Pulsar 与流原生

4. 社区案例分享

 

2. Apache IoTDB 的前世今生与技术细节揭秘

WX20191206-201116@2x.png

摘要:

随着时间序列数据逐渐成为重要的数据类型之一,时序数据库日益受到关注。Apache IoTDB 是目前唯一一个从我国高校走进 Apache 的项目,其读写性能远超当前市面上的常见时序数据库。

本次分享会介绍我们为什么做 IoTDB、IoTDB 如何从实验室技术原型系统变为工业级产品、以及 IoTDB 如何实现每秒数千万数据点的写入性能。

 

3. 腾讯海量数据 MQ 实践与 Pulsar 新技术落地

355.png

摘要:

腾讯大数据经过 10 多年的发展,数据接入总量已超过了 35 万亿条记录/天,且仍在随业务飞速发展。在长期的数据平台研发与运维实践中,消息中间件在高可靠与高性能场景下的开发、部署与运维得到了逐步完善。

本次分享将围绕腾讯的海量数据接入实践,来探讨 MQ 设计的原则与方法论,并结合最新的 Apache Pulsar 的落地实践,介绍新技术落地所克服的技术挑战并分享相应的经验。本着回馈社区的精神,最后将揭示腾讯大数据未来在 Pulsar 社区的贡献与生态合作计划。

 

4. 基于 IoTDB 打造时序数据全生命周期管理的开源解决方案

WX20191206-201203@2x.png

摘要:

无论是互联网公司还是工业企业,针对时间序列数据的管理和使用正在成为重要的发力点。本报告围绕 Apache IoTDB,结合 Apache PLC4X、Apache Pulsar、Apache Spark 等介绍时间序列的采集、缓存、存储、处理、分析、可视化的全生命周期的开源解决方案,让海量时间序列数据管理不再成为掣肘用户应用的难题。

 

5. Pulsar IO 运行原理与开发实践(连接 Pulsar 和 IoTDB)

WX20191206-201253@2x.png

摘要:

本次分享主要介绍以下几个方面:

1. Functions Worker 中各个组件的运作机制

2. Pulsar IO 如何运行在 Functions Worker 上

3. 以连接 IoTDB 为例分享一个 Connector 的开发实践

 

‍♂️关于主办方

本次 Meetup 由 StreamNative 联合清华大学大数据系统软件国家工程实验室共同举办。

StreamNative 是一家围绕 Apache Pulsar 和 Apache BookKeeper 打造下一代流数据平台的开源基础软件公司。我们秉承“Event Streaming 是大数据的未来基石”、“开源是基础软件的未来”这两个理念,专注于开源生态和社区的构建,致力于前沿技术领域的创新。

清华大学「大数据系统软件国家工程实验室」挂靠清华大学软件学院,中国工程院院士孙家广担任实验室主任,国家杰出青年科学基金获得者、清华大学软件学院院长王建民教授担任实验室执行主任。大数据系统软件国家工程实验室是承担我国大数据系统软件技术研发与工程化的国家级创新平台。

WechatIMG77.jpeg

伙伴.png

 

Apache Pulsar 在智联招聘的实践 -- 从消息队列到基于 Apache Pulsar 的事件中心

Apache Pulsar 在智联招聘的实践 -- 从消息队列到基于 Apache Pulsar 的事件中心

导读:本文中鹏辉介绍了以前的消息中间件在智联招聘的应用和场景;以及对消息中间件选型的诉求;详细描述了选型过程中的细致思考。接着介绍了为什么会选择 Pulsar,以及 Pulsar 中和智联的场景匹配的特性。最后提供了详细的 Pulsar 落地实践。


业务场景


消息队列作为智联招聘非常重要的平台级服务负责全业务线的消息投递。有很多非常典型的业务场景,我们用一个业务场景简历投递来说明消息队列为业务提供的支持


图 1. 简历投递业


当 C 端用户发生一次简历投递的时候会先发送一条消息到消息队列服务,C 端中台、B 端中台以及平台级的基础服务会从消息队列消费这条消息进行自己的业务逻辑处理比如写 DB、通知 B 端企业等,在这个场景中消息队列为投递业务提供了很好异步解耦支持,在高峰期的时候可以提供很好的削峰作用以保障各业务系统的稳定运行。


上面的这个场景是非常典型的工作队列场景,在工作队列中大多数是为了实现业务场景支持在线服务而设定的,往往具有以下特点:

  1. 一条消息会被多个业务方消费

  2. 多个业务方之间广播方式消费 (每个业务方消费完整的一份数据)

  3. 单个业务方采用集群消费模式 (每个 consumer 消费部分数据)

  4. 每条消息都需要确保送达,消息队列会采用重试的机制来保证这一点

  5. 重要业务消息需要提供跟踪机制可以查询整个消息的生命周期


还有一些业务的需求会使用到延时消息,定时消息等。


基于 RabbitMQ 的自研 MQService

RabbitMQ 作为一款非常成熟的消息队列产品可以很好的应对工作队列的场景,当然也有一些不足比如单队列的扩展能力、延时消息支持的不够好等。我们在 RabbitMQ 基础上又做了一层抽象 (MQService),将 RabbitMQ 看做一个消息服务的存储节点来用,在 Zookeeper 中会记录 Topic 的数据在 RabbitMQ 节点的分布并增加了容错的特性来保证存储节点失败的情况下可以持续提供消息写入能力。在招聘旺季消息队列每天约承载数 10 亿的消息投递。


MQService 整体结构如下:


图 2.MQService 架构


用户可以通过 Thrift 协议、Http 协议、MQTT 来做消息的发送和消费。用户也可以注册 Http 回调接口来消费消息。默认的 Java 客户端封装了 Thrift 协议及 MQTT 的消息生产及消费,其他语言并没有封装对应的客户端而是通过 Http 协议进行消息的生产和消费,整体智联招聘也是以 Java 做后台服务为主。


接下来我们看一下通过 zookeeper 维护的 Topic 与 RabbitMQ 分组以及 RabbitMQ 节点的关系。


每个 Topic 都对对应到一个 Group,每个 Group 下会挂一些 RabbitMQ 节点。当 producer 发送一条消息时 MQService 会从缓存中拿到对应这个 Topic 可用的 RabbitMQ 节点的列表,MQService 会通过负载均衡策略选择其中的一个 RabbitMQ 节点进行写入,写入失败会重试下一个节点直到写入成功。单个节点如果写入失败次数在一定时间内达到一个特定值会触发熔断机制,单个 RabbitMQ 节点在熔断期间不对外提供写入及查询服务。


通过上面的介绍,大家应该可以对 MQService 有一个直观的印象,这里不再详细展开来介绍实现的细节。


以 Kafka 为中心的流批处理

首先 Kafka 在智联有招聘有大规模的应用,每天的数据传输量大约在数十 TB 量级,覆盖的范围包括 ELK、实时计算等。我们还是以计算每天不同时间端的投递量为例子来介绍 Kakfa 在这个场景下的使用。



图 3. 基于 Kafka 的 Streaming 模型


这是一个非常经典的流式计算的架构,通过采集业务日志入 Kafka,再通过 Spark/Flink 之类的计算框架做计算工作。在计算投递量的场景中,我们通过将计算结果按小时以及职位为维度将计算结果保存在 MySQL 中,也会将明细数据存储在 Hive 中供离线计算使用。


那么很容易发现同一个业务场景但是数据来源是不一样的,一个是业务方发送至 MQService 的一个是通过 Logstash 采集业务端日志来做的,那么我们如何来保证这两份数据的一致性?即使不通过采集日志,业务方去双写又如何来保证一致性呢?这样也给用户带来额外的负担。


矛盾点在于 MQService 很难融入到流行的实时计算框架或者批处理框架,而 Kafka 也应用在工作队列模式下显得有点力不从心。主要体现在 Kakfa 数据消费的支持,Partition 数量与 Consumer 数量的绑定造成的 Partition 数量要跟着 consumer 的消费能力决定,业务方处理数据很难保证一批数据都能够成功处理,做 offset commit 的时候也是无法达到用户的预期。这是基于产品在业务场景的匹配上而讨论论的,就像 Kafka 介绍的那样 (A distributed streaming platform)。


因此在 2018 年初时我们提出了通过一套方案来解决 “工作队列 + Streaming” 的想法也就是事件中心,期望事件中心可以承载智联全业务线用户行为、中台以及后台的业务事件传递。产品和业务系统在服务过程中会产生事件,事件是在先前定义好的,对于事件的生产方无需关心事件的消费,只需要关心事件的格式定义以及事件的生产。事件的消费方可以在事件中心去查阅自己想要的订阅的事件来申请订阅,甚至是数据产品也可以在事件中心去找找灵感。


这样我们不需要关心两个消息中间件数据的一致性问题,一份数据就可以匹配 “工作队列 + Streaming” 场景,对资源消耗、系统运维都有很好的改善。


工作队列 + Streaming 场景全新诉求


有了这个想法之后我们开始总结我们需要的是一个什么样的产品,然后开始围绕我们的需求去做设计工作和技术调研工作。我们总结了我们一些诉求如下:


容灾能力及一致性

数据做分布式存储并且在分布式环境中要保证一致性。有一些重要的业务是依赖消息可靠性以及数据一致性的,所以在技术选型的时候如果在一个支持一致性的模型下去弱化一致性提升可用性是比较容易的,但是如果在一个没有一致性模型的方案上去做一致性这将会需要一个很大的改动。


单 Topic 扩展能力

就像在 MQService 描述的那样,同一个 Topic 可以利用多个节点来做横向扩展。Kafka 在这一点做了很好的抽象 (Partition)。同一个 Partition 的事件可以提供顺序性消费。


累计签收与单条签收

累计签收主要应用在 Streaming 场景下,而单条签收可以很好的匹配工作队列的场景,就像一次简历投递的业务处理,业务本身没有顺序性的要求,单条签收可以很好的支持消费者的消费能力扩展。而在累计签收模式下单分区是要保证顺序性的。


事件回溯能力

我们需要根据不同的事件来决定保留的时长或大小,可以为一些想要拿到历史事件的业务提供支持,我们也可以看到这也是 MQService (上文提到的) 薄弱的地方,MQService 是无法给用户提供回溯能力的。


基于上面的一些主要的特性我们开始了技术选型的调研工作。经过了一段时间的调研工作后我们发现开源的消息中间件产品兼顾容灾能力和一致性的产品几乎没有,因此我们有一个想法那就是基于一个强一致性的分布式日志存储系统来做队列功能的开发,期间也考虑过使用 Raft 协议 + Log 存储但是最终还是 Bookkeeper 吸引了我们的关注,Bookeeper 提供了开箱即用的 API 以及在 Twitter、Hadoop 有着大规模应用的场景,其稳定性以及成熟度等都是可以保证的。因此我们在大约 5 月份的时候已经开始做基于 Bookkeeper 事件中心的一些设计工作,在接触 Bookkeeper 社区的的时候才了解到 Apache Puslar,通过了一段时间对 Apache Pulsar 的了解,以及社区活跃度的一些观察和 Apache Pulsar 社区小伙伴们的大力支持下,我们决定基于 Apache Pulsar 来搭建我们的事件中心。


为什么选择 Apache Pulsar 


Apache Pulsar 有很多特性在满足事件中心需求的前提了还给了我们更多的惊喜,为更多的场景提供非常好的解决方案。


灵活的可用性和一致性选择

在每个 Topic 中由一系列的 Ledger 构成,每个 Ledger 有三个关键配置:


  • Ensemble Size (E)

  • Write Quorum Size (Qw)

  • Ack Quorum Size (Qa)


Ensemble Size (E) 决定了 Pulsar 写入 Ledger 可用的 Bookies 池的大小。

Write Quorum (Qw) 是 Pulsar 将要写入的实际的 Bookies 数量。可以等于或者小于 E。


图 4.E = 3 Qw = 3


当 Qw 小于 E 时,以条带化的方式分配读 / 写即每个 Bookie 只提供读写请求的子集。因此可以提升吞吐量,降低延迟。这也是提升单个 Partition 吞吐能力的一个很好的方案,这也得益于基于 Segment 为物理单元的存储设计。消息通过 Robin 的方式写入指定的 Bookie,在查询消息是可以根据 MessageId 取模即能获得所在的 Bookie 列表。

图 5.E = 5 Qw = 3


Ack Quorum (Qa) 是确认写入 Bookies 的数量,Pulsar Broker 将确认发送给客户端。为了一致性,Qa 应该是:(Qw + 1) / 2 或者更大。


这个特性可以很好的让我们在可用性和一致性上去做选择。


订阅的抽象

单队列的扩展 Kafka 为我们做了很好的抽象,Apache Pulsar 也基本采用相同的思路。而对于订阅的抽象,我们认为 Apache Pulsar 再一次为我们做了很好的抽象,通过 3 种不同的订阅方式来匹配不同的使用场景。


图 6.Apache Pulsar 对订阅的抽象


消息存储在 Topic 中。逻辑上一个 Topic 是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar 使用游标来跟踪偏移量。生产者将消息发送到一个指定的 Topic,Apache Pulsar 保证消息一旦被确认就不会丢失 (正确的配置和非整个集群故障的情况下)。


消费者通过订阅来消费 Topic 中的消息。订阅是游标 (跟踪偏移量) 的逻辑实体,并且还根据不同的订阅类型提供一些额外的保证


  • Exclusive (独享) - 一个订阅只能有一个消息者消费消息

  • Shared (共享) - 一个订阅中同时可以有多个消费者,多个消费者共享 Topic 中的消息

  • Fail-Over (灾备) - 一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。


一个 Topic 可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。


Apache Pulsar 通过允许消费者将 Topic 看做在消费者消费确认后删除消息的队列,或者消费者可以根据游标的回放来提供队列和日志的语义。在底层都使用日志作为存储模型。


这为我们通过一套系统支持工作队列和 Streaming 的诉求提供了很好的支持,在工作队列场景我们使用 share 模式,在 Streaming 模式我们使用 Failover 或者 Exclusive。我们只需要一份数据就可以同时支持两种场景。


更好的 IO 和存储设计

当在 Bookie 上写入数据时,首先将该消息写入日志文件,这是一个预写日志 (WAL), 它可以帮助 Bookkeeper 在发生故障时避免数据丢失。它与关系型数据库持久化保证的机制相同。


写入预写日志的操作完成后会将数据放入缓存。写入的缓存会在内存中做积累并定期进行排序和刷盘。对写入进行排序以便将同一 Ledger 的条目放在一起,从而提高读取性能。如果条目以严格的时间顺序写入,在读取时无法利用磁盘的高效顺序操作


Bookkeeper 容许将磁盘 IO 做读写分离。写入都按顺序写入日志文件可以存储在专用的磁盘上,并且可以批量刷盘以获得搞得吞吐量。除此之外从写入操作来看没有其他的同步磁盘 IO 操作,数据都是写入到内存的缓存区。


写缓存通过异步的方式批量将条目写入到日志文件和 RocksDB,因此,一个磁盘用于同步写入日志文件,另一个磁盘用于异步写入数据和读取操作,


图 7.Apache Pulsar 的 IO 及存储设计


在存储设计上 Bookkeeper 以 Segment 为中心设计对系统扩容、冷热数据分离提供了很好的支持。在扩容方面通过增加 Bookie 节点就可以分担整个集群的存储压力,在冷热数据分离方面通过将 Segment 搬迁至二级存储如 S3、OSS 等更廉价的存储设备中,支持在线业务往往使用 SSD 来做存储。因此我们可以兼顾热数据的高性能与冷数据的大空间存储。


图 8.Bookie 扩容


图 9. 冷数据搬迁


在 IO 和存储设计上以及 Offload 的特性给了我们更多的惊喜,可以更好的为我们在不影响在线业务的支持上兼顾大量事件存储需求的痛点,大大的降低了冷数据的存储成本。我们计划将冷数据存储至 OSS。


上面挑选了 Apache Pulsar 非常核心的 3 个 messaging 特性来做介绍,这与事件中心的初衷是非常匹配的,然而 Apache Pulsar 远不止这些,有完善的多租户特性提供 Topic 的分层次管理,多种 Schema 的支持为数据校验、序列化提供更便捷的方式,轻量级的 Pulsar Function 以及 Pulsar SQL 都是非常值得去探索的特性,这里就不一一展开介绍了。


Apache Pulsar 在智联招聘的落地实践


下面将介绍 Apache Pulsar 在智联招聘落地过程中的一些实践


为 Namespace 设置合理的 Backlog Quota

Pulsar 为我们提供了 Backlog 的机制能够记录每个 Subscription 的消费状况。也提供了 Backlog Quota 的设置,主要可以设置 Backlog 大小以及达到阈值时的控制策略。


Backlog Quota 的控制策略有 3 种:


  1. producer_request_hold

  2. producer_exception

  3. consumer_backlog_eviction


producer_request_hold 作为默认配置,在达到 Backlog 设置的大小阈值后会 block producer 发消息操作,这个配置不适合用于消息发送方是在线业务的使用场景。


producer_exception 在达到 Backlog 设置的大小阈值后,producer 会快速失败。


consumer_backlog_eviction 在达到 Backlog 设置的大小阈值后会将 subscription 未签收的头部数据逐出,可以理解为自动签收。其实这个和 producer_exception 的区别在于 producer_exception 对于订阅方将会丢失尾部数据,而 consumer_backlog_eviction 是丢失头部数据。


我们大部分使用 consumer_backlog_eviction 策略。目前 Pulsar 支持在 namespace 级别设置这个策略,在 2.2.0 版本可以在 broker.conf 文件修改全局策略。将 Backlog 作为一个重点的监控项监控起来也是非常有必要的,后面会说到这部分。


增加 MaxProducersPerTopic 限制

防止错误或者恶意的 Client 使用造成 Broker 维持大量的 Producer 对象。Broker 默认的配置是不限制的,增加限制可以提升 Pulsar Cluster 的安全性。


Pulsar 提供两种方式来设定 MaxProducerPerTopic


  1. broker.conf 中设置 maxProducersPerTopic

  2. 通过./pulsar-admin namespaces set-max-producers-per-topic -p


目前我们在 broker.conf 中的 maxProducersPerTopic = 10000,如果 namespace 有个性需求的话通过 ./pulsar-admin namespaces set-max-producers-per-topic -p 设置。


Apache Pulsar 监控与报警

Pulsar 提供丰富的 Prometheus 指标信息输出,我们可以这些指标信息来做好 Pulsar 的监控报警。Pulsar 的客户端也记录了丰富的指标,我们做了一个 client 的扩展包将 client 的节点信息记录在 Zookeeper 中,由 Prometheus 自动发现,这样 Client 端的指标信息由 Prometheus 采集。


配合 Grafana 的监控展示,实时了解集群的状态

图 10. 集群状态看板 - 1


图 11. 集群状态看板 - 2



图 12. 分 Namespace 状态展示


报警规则配置:

  1. Client 发送失败次数

  2. Backlog 超阈值

  3. Rates/Network 超阈值

  4. Client > 50ms 延迟

  5. Broker > 50ms 延迟

  6. Storage Size 超阈值


Pulsar 多集群流量切换

为了避免集群整体不可用,我们通过 Zookeeper 控制 Client 的连接串。基于 Pulsar Client 的 Service URL Provider 基础上做的二次开发。在 Zookeeper 中存储的 Pulsar 连接串改变的时候,Client 会自动断掉当前的连接并重新与新的 Pulsar 地址进行连接。


为重要业务提供消息链路追踪

我们基于 Pulsar Client 的 Interceptor 接口以及 Zipkin 进行二次开发,为了实现消息的链路跟踪。消息链路跟踪的方案是通过日志采集统一入 Hbase。每条消息都具备消息链路跟踪成本是昂贵的,并不适用于所有的场景,更适应与一些比较重要切消息量不太大的场景,当然这个根据不同的业务而定。


总结

我们通过介绍之前的消息中间件在智联招聘的应用情况来说明我们的痛点所在,我们计划打造一个可以解决当下痛点的产品来支撑智联招聘的业务。我们通过一段时间的技术选型工作后最终选择了 Apache Pulsar 作为我们的搭建企业级事件中心的基础。


截止目前事件中心接入的事件种类约 100 个,每天产生 5 亿事件量。部分业务通过灰度的方式接入,计划在 11 月底能够接入 20 亿事件量 / 日。


智联招聘也在持续为 Apache Pulsar 社区贡献新的特性比如 Dead Letter Topic, Client Interceptors 等,智联招聘有很多业务场景也非常依赖延时消息特性,后面我们也会在 Pulsar 上贡献此特性。


感谢 Apache Pulsar 社区小伙伴们在项目落地过程中的技术支持。





本文分享自微信公众号 - ApachePulsar(ApachePulsar)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与 “OSC 源创计划”,欢迎正在阅读的你也加入,一起分享。

Apache Pulsar 正式引入 Cloud Storage Sink 连接器:实现 Apache Pulsar 数据上云

Apache Pulsar 正式引入 Cloud Storage Sink 连接器:实现 Apache Pulsar 数据上云


越来越多的企业选择将数据存储到云平台中。对于大部分软件体系结构而言,“数据上云”至关重要。将数据迁移上云,有助于降低企业采购软硬件的成本,减少监控、管理工作,提供较大存储容量。而且,云存储支持数据备份,保护数据免受勒索软件的侵害。


许多 Pulsar 用户选择将数据存储在各种云平台中,例如 Amazon Simple Storage Service(Amazon S3)或 Google Cloud Storage(Google GCS)等。如果没有统一的应用程序将主题级别的数据迁移到云存储,Pulsar 用户必须自己编写解决方案。这是一项繁琐的任务。


今天,我们很高兴地宣布 Apache Pulsar 引入 Cloud Storage Sink 连接器(以下简称为 Cloud Storage 连接器)。Cloud Storage 连接器采用简单、可靠的方式,帮助用户将数据从 Apache Pulsar 迁移到云存储的对象中。





什么是 Cloud Storage 连接器



Cloud Storage 连接器定期轮询 Pulsar 数据,然后将其以 Avro、JSON 或 Parquet 格式存储到云存储的对象(AWS S3、Google GCS 等)中。根据用户的环境设置,Cloud Storage 连接器保证向消费者(consumer)“只发送一次” 消息。


Cloud Storage 连接器支持基于 Pulsar 主题分区或者基于时间(以天或小时为单位)的 partitioner。Partitioner 将 Pulsar 主题分区拆分成为多个数据块。数据块相当于云存储中的对象,其虚拟路径使用 Pulsar 分区 ID和该数据块的起始偏移量进行编码。对 Pulsar 分区和该数据块的起始偏移量进行编码。数据块的大小取决于云存储写入的记录的数量和 schema 兼容性。如果没有在配置中指定 partitioner,则使用保留 Pulsar 分区的缺省 partitioner。


Cloud Storage 连接器支持以下功能:


确保严格一次(Exactly-Once)的数据输出

Cloud Storage 使用 partitioner 导出 Pulsar 数据,这就确保 Cloud Storage 连接器能够实现“只发送一次”数据,从而满足云存储的数据一致性要求。

支持所有数据(无论是否带有 schema 格式)

Cloud Storage 连接器支持将采用 Avro、JSON 或 Parquet 格式的数据写入云存储的对象中。通常情况下,只要数据结构支持 Format 接口,Cloud Storage 连接器就可以将该数据迁移到云平台。

支持基于时间的 partitioner

Cloud Storage 连接器使用 Pulsar 消息的 publishTime 时间戳定义 TimeBasedPartitioner 类。支持天或小时级别的时间间隔。

支持多种对象存储类型

Cloud Storage 连接器使用 jclouds 实现云存储。用户可以使用 jclouds 的对象存储拓展 JAR 包来支持更多服务提供商。如需自定义连接到其他服务提供商所需的密钥,可以通过服务提供商接口(Service Provider Interface,SPI)注册 org.apache.pulsar.io.jcloud.credential.JcloudsCredential





为什么需要 Cloud Storage 连接器



Apache Pulsar 提供丰富的连接器生态系统,将 Pulsar 与其他数据系统连接起来。2018 年 8 月,Apache Pulsar 发布 Pulsar IO。Pulsar IO 允许用户利用现有的 Pulsar Functions 框架,在 Pulsar 和外部系统(例如 MySQL、Kafka)之间传输数据。但是,有些用户希望将数据从 Apache Pulsar 迁移到云存储。这些用户被迫构建定制解决方案并手动运行它们。


为了解决这些问题,Apache Pulsar 正式引入 Cloud Storage 连接器。Cloud Storage 连接器利用 Pulsar IO 的诸多优势,例如容错性、并行性、弹性、负载平衡、按需更新等,帮助用户将 Pulsar 数据导入云存储。

Cloud Storage 连接器易于使用。用户无需编写任何代码,即可获得一款支持多种对象存储服务商、支持灵活的数据格式、自定义数据分区的对象存储连接器。





试用 Cloud Storage 连接器



本节介绍如何安装 Cloud Storage 连接器并使用 Cloud Storage 连接器向外部系统发送 Pulsar 消息。在本节举例中,我们使用 AWS S3 作为存储 Pulsar 数据的云平台。Cloud Storage 连接器采用基于时间的 partitioner,将 Pulsar 数据以 Parquet 的格式存储到 AWS S3。


前提条件



➡️ 创建 AWS 账户,并登录 AWS Management Console。

➡️ 创建 AWS S3 存储桶。具体操作可参考:
https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html

➡️ 获取 AWS S3 存储桶的密钥。具体操作可参考:
https://docs.aws.amazon.com/IAM/latest/UserGuide/getting-started_create-admin-group.html


步骤一:

安装 Cloud Storage 连接器并运行 Pulsar Broker


1. 下载 Cloud Storage 连接器的安装包
https://github.com/streamnative/pulsar-io-cloud-storage/releases


2. 将 Cloud Storage 连接器的安装包,添加到 Pulsar broker 的配置文件中。

cp pulsar-io-cloud-storage-2.5.1.nar apache-pulsar-2.6.1/connectors/pulsar-io-cloud-storage-2.5.1.nar

3. 使用 Pulsar broker 的配置文件,启动 Pulsar broker。

cd apache-pulsar-2.6.1bin/pulsar standalone

步骤 2:

配置并启动 Cloud Storage 连接器


1. 创建 cloud-storage-sink-config.yaml 文件,并在文件中定义 Cloud Storage 连接器的配置,如下所示。

tenant: "public"namespace: "default"name: "cloud-storage-sink"inputs: - "user-avro-topic"archive: "connectors/pulsar-io-cloud-storage-2.5.1.nar"parallelism: 1
configs: provider: "aws-s3", accessKeyId: "accessKeyId" secretAccessKey: "secretAccessKey" role: "" roleSessionName: "" bucket: "s3-sink-test" region: "" endpoint: "us-standard" formatType: "parquet" partitionerType: "time" timePartitionPattern: "yyyy-MM-dd" timePartitionDuration: "1d" batchSize: 10  batchTimeMs: 1000

将  cloud-storage-sink-config.yaml   文件中的  accessKeyId   和   secretAccessKey   的取值替换为 AWS 密钥。如需配置更多控制权限,可以设置 role  和 roleSessionName  字段。

2. 使用 cloud-storage-sink-config.yaml 文件,在本地启动 Cloud Storage 连接器。

$PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file cloud-storage-sink-config.yaml

步骤3:
发送 Pulsar 消息


运行以下命令,发送 Pulsar 消息。Pulsar 消息采用 Avro schema 格式。目前,Pulsar 消息只支持 Avro 或 JSON schema。
try ( PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .build(); Producer<TestRecord> producer = pulsarClient.newProducer(Schema.AVRO(TestRecord.class)) .topic("public/default/test-parquet-avro") .create(); ) { List<TestRecord> testRecords = Arrays.asList( new TestRecord("key1", 1, null), new TestRecord("key2", 1, new TestRecord.TestSubRecord("aaa")) ); for (TestRecord record : testRecords) { producer.send(record); } }

步骤 4:
验证 Pulsar 数据完整性


通过 AWS S3 Management Console,查看从 Pulsar 实时导入到 AWS S3 云平台中的数据。





结 语



我们希望这篇文章能够引起您对 Cloud Storage 连接器的兴趣。Cloud Storage 连接器是一个开源项目,采用 Apache License V2。


点击「阅读原文」,下载 Cloud Storage 连接器最新发布版本,开始使用 Cloud Storage 吧!


如果在使用中遇到任何问题,可以在 Cloud Storage 连接器仓库中提交 issue,我们会在第一时间回应。
https://github.com/streamnative/mop/issues

同时,我们也欢迎您为 Cloud Storage sink 贡献特性。

推荐阅读

➡️  Pulsar 与 Kafka 全方位对比(上篇):功能、性能、用例
➡️ 使用 AWS S3 offloader 卸载存储在 BookKeeper 中的数据
➡️ Pulsar Sink 入门指南

本文分享自微信公众号 - StreamNative(StreamNative)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

我们今天的关于如何使用 Apache Flume 发送日志数据至 Apache Pulsarflume收集日志到hdfs的分享就到这里,谢谢您的阅读,如果想了解更多关于Apache Flink 和 Apache Pulsar 的批流融合、Apache IoTDB x Apache Pulsar Meetup、Apache Pulsar 在智联招聘的实践 -- 从消息队列到基于 Apache Pulsar 的事件中心、Apache Pulsar 正式引入 Cloud Storage Sink 连接器:实现 Apache Pulsar 数据上云的相关信息,可以在本站进行搜索。

本文标签:

上一篇String.valueOf(null) 遇到的坑(string.valueof方法 null)

下一篇生成统计null和非null表(not in 会统计null值)