GVKun编程网logo

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)

28

在这篇文章中,我们将为您详细介绍FlumesinkKafkaSpoutStormBoltHbaseorRedis(Storm)的内容。此外,我们还会涉及一些关于flumekafkastorm常用命令、

在这篇文章中,我们将为您详细介绍Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)的内容。此外,我们还会涉及一些关于flume kafka storm常用命令、Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)、Flume sink Kafka Spout Storm Bolt Hbase or Redis (Kafka)、flume taidir to kafkasink的知识,以帮助您更全面地了解这个主题。

本文目录一览:

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)

眨眼又到了年底,心里开始长草,回想2016,没啥成就,便在这千万人都立志的时刻,重新倒腾一下实时流式处理.

So,以前搭建环境最让人头疼的就是搭建Storm集群,各种依赖,稍有不慎,版本就不兼容,所以,此次首先是从storm开始.

Zookeeper的搭建就略过了,随便百度一下Zookeeper,只要不是太古老的技术贴,都能照着葫芦画瓢.

这次搭建Storm,留了个心眼,首先去官网看了下最新版本,看完之后,O(∩_∩)O哈哈~,心理美滋滋哎

storm官网

storm官网

从此处查看storm的最新的稳定版本,本次小编选择的是Version: 0.10.2,迫不及待的点击后,进入下面的页面:

输入图片说明

首先能看到的就是一个大大的download,再仔细找找还能找到javadoc以及Setup and Deploying.小伙伴,赶紧点击进去吧,Setup and Deploying.

再来看下面这张图

输入图片说明

这可是一手鞋,太棒了,终于不用忍受二手鞋的各种不合脚.

下面原谅小鞭贴英文原版的安装步骤:

Setting up a Storm Cluster

This page outlines the steps for getting a Storm cluster up and running. If you''re on AWS, you should check out the storm-deploy project. storm-deploy completely automates the provisioning, configuration, and installation of Storm clusters on EC2. It also sets up Ganglia for you so you can monitor CPU, disk, and network usage.

If you run into difficulties with your Storm cluster, first check for a solution is in the Troubleshooting page. Otherwise, email the mailing list.

Here''s a summary of the steps for setting up a Storm cluster:

Set up a Zookeeper cluster Install dependencies on Nimbus and worker machines Download and extract a Storm release to Nimbus and worker machines Fill in mandatory configurations into storm.yaml Launch daemons under supervision using "storm" script and a supervisor of your choice Set up a Zookeeper cluster Storm uses Zookeeper for coordinating the cluster. Zookeeper is not used for message passing, so the load Storm places on Zookeeper is quite low. Single node Zookeeper clusters should be sufficient for most cases, but if you want failover or are deploying large Storm clusters you may want larger Zookeeper clusters. Instructions for deploying Zookeeper are here.

A few notes about Zookeeper deployment:

It''s critical that you run Zookeeper under supervision, since Zookeeper is fail-fast and will exit the process if it encounters any error case. See here for more details. It''s critical that you set up a cron to compact Zookeeper''s data and transaction logs. The Zookeeper daemon does not do this on its own, and if you don''t set up a cron, Zookeeper will quickly run out of disk space. See here for more details. Install dependencies on Nimbus and worker machines _**Next you need to install Storm''s dependencies on Nimbus and the worker machines. These are:

Java 7 Python 2.6.6**_ These are the versions of the dependencies that have been tested with Storm. Storm may or may not work with different versions of Java and/or Python.

Download and extract a Storm release to Nimbus and worker machines Next, download a Storm release and extract the zip file somewhere on Nimbus and each of the worker machines. The Storm releases can be downloaded from here.

Fill in mandatory configurations into storm.yaml The Storm release contains a file at conf/storm.yaml that configures the Storm daemons. You can see the default configuration values here. storm.yaml overrides anything in defaults.yaml. There''s a few configurations that are mandatory to get a working cluster:

  1. storm.zookeeper.servers: This is a list of the hosts in the Zookeeper cluster for your Storm cluster. It should look something like:

storm.zookeeper.servers:

  • "111.222.333.444"
  • "555.666.777.888" If the port that your Zookeeper cluster uses is different than the default, you should set storm.zookeeper.port as well.
  1. storm.local.dir: The Nimbus and Supervisor daemons require a directory on the local disk to store small amounts of state (like jars, confs, and things like that). You should create that directory on each machine, give it proper permissions, and then fill in the directory location using this config. For example:

storm.local.dir: "/mnt/storm" If you run storm on windows,it could be: yaml storm.local.dir: "C:\storm-local" If you use a relative path,it will be relative to where you installed storm(STORM_HOME). You can leave it empty with default value $STORM_HOME/storm-local

  1. nimbus.seeds: The worker nodes need to know which machines are the candidate of master in order to download topology jars and confs. For example:

nimbus.seeds: ["111.222.333.44"] You''re encouraged to fill out the value to list of machine''s FQDN. If you want to set up Nimbus H/A, you have to address all machines'' FQDN which run nimbus. You may want to leave it to default value when you just want to set up ''pseudo-distributed'' cluster, but you''re still encouraged to fill out FQDN.

  1. supervisor.slots.ports: For each worker machine, you configure how many workers run on that machine with this config. Each worker uses a single port for receiving messages, and this setting defines which ports are open for use. If you define five ports here, then Storm will allocate up to five workers to run on this machine. If you define three ports, Storm will only run up to three. By default, this setting is configured to run 4 workers on the ports 6700, 6701, 6702, and 6703. For example:

supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 Monitoring Health of Supervisors Storm provides a mechanism by which administrators can configure the supervisor to run administrator supplied scripts periodically to determine if a node is healthy or not. Administrators can have the supervisor determine if the node is in a healthy state by performing any checks of their choice in scripts located in storm.health.check.dir. If a script detects the node to be in an unhealthy state, it must print a line to standard output beginning with the string ERROR. The supervisor will periodically run the scripts in the health check dir and check the output. If the script’s output contains the string ERROR, as described above, the supervisor will shut down any workers and exit.

If the supervisor is running with supervision "/bin/storm node-health-check" can be called to determine if the supervisor should be launched or if the node is unhealthy.

The health check directory location can be configured with:

storm.health.check.dir: "healthchecks"

The scripts must have execute permissions. The time to allow any given healthcheck script to run before it is marked failed due to timeout can be configured with:

storm.health.check.timeout.ms: 5000 Configure external libraries and environmental variables (optional) If you need support from external libraries or custom plugins, you can place such jars into the extlib/ and extlib-daemon/ directories. Note that the extlib-daemon/ directory stores jars used only by daemons (Nimbus, Supervisor, DRPC, UI, Logviewer), e.g., HDFS and customized scheduling libraries. Accordingly, two environmental variables STORM_EXT_CLASSPATH and STORM_EXT_CLASSPATH_DAEMON can be configured by users for including the external classpath and daemon-only external classpath.

Launch daemons under supervision using "storm" script and a supervisor of your choice The last step is to launch all the Storm daemons. It is critical that you run each of these daemons under supervision. Storm is a fail-fast system which means the processes will halt whenever an unexpected error is encountered. Storm is designed so that it can safely halt at any point and recover correctly when the process is restarted. This is why Storm keeps no state in-process -- if Nimbus or the Supervisors restart, the running topologies are unaffected. Here''s how to run the Storm daemons:

Nimbus: Run the command "bin/storm nimbus" under supervision on the master machine. Supervisor: Run the command "bin/storm supervisor" under supervision on each worker machine. The supervisor daemon is responsible for starting and stopping worker processes on that machine. UI: Run the Storm UI (a site you can access from the browser that gives diagnostics on the cluster and topologies) by running the command "bin/storm ui" under supervision. The UI can be accessed by navigating your web browser to http://{ui host}:8080. As you can see, running the daemons is very straightforward. The daemons will log to the logs/ directory in wherever you extracted the Storm release.

看到小鞭表粗的的位置了吗,神马?只需要安装Java和Python,太棒了......

于是小鞭怀着无比激动的心情,下载了storm,解压,这些都略过.

下面开始说说我的集群分布:

小鞭手头有若干台测试机器,咳咳咳,当然是公司的...准备使用三台机器来搭建storm集群,ip地址如下:

192.168.2.141 node2

192.168.2.142 node3

192.168.2.143 node4

接下来,去解压目录中去找到配置文件apache-storm-1.0.2/conf/storm.yaml

小鞭直接贴我的配置了, 详细配置说明,诸君就自行百度啦

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
     - "node1"
     - "node2"
     - "node3"
     

 storm.local.dir: "/home/hadoop/app/storm/local-dir"
# 
 nimbus.seeds: ["node1"]
# 
# 
# ##### These may optionally be filled in:
#    
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
#
## Locations of the drpc servers
 drpc.servers:
     - "node1"
#     - "server2"
# drpc.port:7777

## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "org.apache.storm.metric.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"

#ui.port:58080

接下来,就是要启动storm集群

以下是启动Storm各个后台进程的方式: Nimbus: 在Storm主控节点上运行”bin/storm nimbus >/dev/null 2>&1 &”启动Nimbus后台程序,并放到后台执行; Supervisor: 在Storm各个工作节点上运行”bin/storm supervisor >/dev/null 2>&1 &”启动Supervisor后台程序,并放到后台执行; UI: 在Storm主控节点上运行”bin/storm ui >/dev/null 2>&1 &”启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。

输入图片说明

看到这张图,是不是很激动,easy,咱们再试试javaApi

输入图片说明

如果看到这张图,你就信以为真了,那就错了,哈哈,这可是本地的cluster,是开发storm应用的时候本地测试使用的.你需要把自己的应用打包,丢到集群上去执行,当然,你也可以通过IDE直接部署到集群上.

反正我搭建的集群是可以用的了,就这样啦,改天继续flume和kafka......

/猫小鞭*******/

flume kafka storm常用命令

flume kafka storm常用命令

Flume

# flume启动

bin/flume-ng agent -n fks -c conf/ -fconf/ytconf/fks/fks001.conf -Dflume.root.logger=INFO,console &


Kafka

# 启动kafka自带的zookeeper(当然也可以使用外部的)

bin/zookeeper-server-start.sh config/zookeeper.properties &

# 启动外部的zookeeper

1. 启动ZK服务:       sh bin/zkServer.sh start
2. 查看ZK服务状态:   sh bin/zkServer.sh status
3. 停止ZK服务:       sh bin/zkServer.sh stop
4. 重启ZK服务:       sh bin/zkServer.sh restart

# 启动kafka的server

bin/kafka-server-start.sh config/server.properties &

# 创建一个主题

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test &

# 查看创建的主题

bin/kafka-topics.sh--list --zookeeper localhost:2181

# send some message(发送一些消息)

输入一条信息(Thisis a message: The you smile until forever),并且Ctrl+z退出shell

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

# start a consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

# 删除以前的topic

bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:218,192.168.197.171:2181


Storm

# storm的启动

# storm nimbus的启动

bin/storm nimbus &

# storm supervisor的启动

bin/storm supervisor &

# storm ui的启动

bin/storm ui &

# storm jar命令运行storm程序

storm jar fks-storm-high-001.jar com.yting.cloud.storm.topology.KafkaTopology


Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)

Flume可以应用于日志采集.在本次的介绍中,主要用于采集应用系统的日志,将日志输出到kafka,再经过storm进行实施处理.

我们会一如既往的光顾一下flume的官网,地址如下:

flume官网

下图是官网的截图,其中的标注是如何配置source以及sink,flume支持多种source和sink,我们本次使用的是监控日志文件使用tail -f 命令作为source,sink则使用sink-kafka,之前已经将kafka和storm集成,所以,日志会直接采集到storm

输入图片说明

配置如下:flume-conf.properties

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called ''agent''

a1.sources = r1  
a1.sinks = k1  
a1.channels = c1  

# Describe/configure the source  
a1.sources.r1.type = exec  
a1.sources.r1.command = tail -F /home/logs/dccfront/dataCollect.log

#Describe the sink  
#a1.sinks.k1.type = logger  
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = dccfront
a1.sinks.k1.brokerList = node2:9092,node3:9092,node4:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory  
a1.channels.c1.type = memory
a1.channels.c1.keep-alive = 60
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
  
# Bind the source and sink to the channel  
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

so easy,接下来就是启动flume

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name a1 -Dflume.root.logger=INFO,console

启动完成时候,就可向日志文件里写日志啦.比如,我是通过访问应用,通过应用产生日志

tail -f 日志文件截图如下:

输入图片说明

storm集群获取的日志如下:

输入图片说明

/猫小鞭/

温馨提示,官方文档其实很简单,看看就会了,从此丢弃二手鞋.

输入图片说明

输入图片说明

输入图片说明

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Kafka)

Flume sink Kafka Spout Storm Bolt Hbase or Redis (Kafka)

storm集群搭建完成之后,自娱自乐的玩了一局dota,JUGG竟然23杀,3死,两度超神,人到高处就只剩孤独寂寞冷啦,喝完一壶花茶,还是决定继续奋战kafka.

同样,小编还是喜欢去官网看看,毕竟好多时间不玩kafka.

Kafka官网

![输入图片说明]

建议大家阅读以下Introduction,当然,你也可以直接进入Documentation

看了以下灌完的quick start,描述略简单,只是说要启动Zookeeper,然后再启动kafka

输入图片说明

Zookeeper的搭建一如既往的略过,下面直接进入正题,搭建kafka,手头的机器资源如下:

192.168.2.141 node2

192.168.2.142 node3

192.168.2.143 node4

下载kafka,解压等就不做赘述了,直接进入server配置环节:server.prpperties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://node2:9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/home/hadoop/app/kafka/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don''t drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=node1:2181,node2:2181,node3:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

详细参数说明相信大家看一下就懂,望文生义一下...... 唯一要注意的一点是,三个broker的配置可不能一行,各自的host要修改成对应的服务器节点的ip或者机器名称(前提是你配置了hosts)

配置文件完成之后,接下来就是启动集群:

bin/kafka-server-start.sh -daemon config/server.properties 执行这条命令来启动kafka的broker,三台机器都要启动.

-deamon是指以后台进程的形式启动(要不终端关闭或者退出当前的会话都会停止kafka的服务)

接下来测试一下kafka,那就创建一个topic

bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 2 --partitions 2 --topic test

小编默认你会创建成功,就不截图解释了

接下来还是测试一下javaApi,我这里的测试代码需要创建一个叫dccfront的主题:

kafka的生产者

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.testng.annotations.Test;

/**
 * yingyinglicai.com Inc.
 * Copyright (c) 2013-2016 All Rights Reserved.
 *
 * @author ZhangZhong
 * @version v 0.1 2016/12/31 11:43 Exp $$
 */
public class KafkaProducerTest {

    @Test
    public void test() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 200; i++) {
            producer.send(new ProducerRecord<String, String>("dccfront", Integer.toString(i),
                Integer.toString(i)));
        }

        producer.close();
    }

}

kafka的消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Properties;

/**
 * yingyinglicai.com Inc.
 * Copyright (c) 2013-2016 All Rights Reserved.
 *
 * @author ZhangZhong
 * @version v 0.1 2016/12/31 17:30 Exp $$
 */
public class KafkaConsumerTest {

    @Test
    public void test() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("dccfront"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
                    record.key(), record.value());
        }

    }
}

测试的时候,你可先启动consumer,然后再启动producer,再观察consumer的消费情况

输入图片说明

/猫小鞭/

flume taidir to kafkasink

flume taidir to kafkasink

flume的数据源采用taiDir,sink类型选择kafka类型

测试目标:flume监控某一个目录的日志文件,并将文件存储到kafka中,在kafka的消费端可以实现数据的消费

dip005、dip006、dip007安装kafka

dip005、dip006、dip007安装flume

1、kafka创建topic

./kafka-topics.sh --create --zookeeper dip005:2181,dip006:2181,dip007 --replication-factor 1 --partitions 1 --topic test

2、编写flume配置

# source的名字
agent.sources = s1
agent.channels = c1
agent.sinks = r1

# 指定source使用的channel
agent.sources.s1.channels = c1
agent.sinks.r1.channel = c1

######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
agent.sources.s1.positionFile = /flume/taildir_position.json
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/flume/data/.*log
agent.sources.s1.fileHeader = true

######## channel相关配置 ########
# channel类型
#agent.channels.c1.type = file
#agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
#agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
#agent.channels.c1.capacity = 1000
#agent.channels.c1.transactionCapacity = 100


agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100


######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.r1.brokerList = dip005:9092,dip006:9092,dip007:9092
agent.sinks.r1.topic = test
clog.sinks.sink_log1.flumeBatchSize = 2000
clog.sinks.sink_log1.kafka.producer.acks = 1

3.启动flume

./bin/flume-ng agent -n agent -c conf -f conf/taildir_conf  -Dflume.root.logger=DEBUG,console

4.在监控目/flume/data 里放入*log文件,或者往*log文件里写数据

5.进入kafka的消费者看,执行消费,即可看到*log里面的数据

./kafka-console-consumer.sh --bootstrap-server dip005:9092 --from-beginning --topic test

 

今天关于Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)的分享就到这里,希望大家有所收获,若想了解更多关于flume kafka storm常用命令、Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)、Flume sink Kafka Spout Storm Bolt Hbase or Redis (Kafka)、flume taidir to kafkasink等相关知识,可以在本站进行查询。

本文标签: