GVKun编程网logo

ELK 学习笔记之基于 kakfa (confluent) 搭建 ELK(elk+kafka+filebeat)

22

本文将分享ELK学习笔记之基于kakfa(confluent)搭建ELK的详细内容,并且还将对elk+kafka+filebeat进行详尽解释,此外,我们还将为大家带来关于Centos7+kafka+

本文将分享ELK 学习笔记之基于 kakfa (confluent) 搭建 ELK的详细内容,并且还将对elk+kafka+filebeat进行详尽解释,此外,我们还将为大家带来关于Centos7+kafka+ELK6.5.x安装搭建、Confluent kafka rest 实战、DataPipeline联合Confluent Kafka Meetup上海站、ELK - Fluentd 日志收集(官方文档 部署安装 配置文件 详解)的相关知识,希望对你有所帮助。

本文目录一览:

ELK 学习笔记之基于 kakfa (confluent) 搭建 ELK(elk+kafka+filebeat)

ELK 学习笔记之基于 kakfa (confluent) 搭建 ELK(elk+kafka+filebeat)

 

0x00 概述

测试搭建一个使用 kafka 作为消息队列的 ELK 环境,数据采集转换实现结构如下:

F5 HSL–>logstash (流处理)–> kafka –>elasticsearch

测试中的 elk 版本为 6.3, confluent 版本是 4.1.1

希望实现的效果是 HSL 发送的日志胫骨 logstash 进行流处理后输出为 json,该 json 类容原样直接保存到 kafka 中,kafka 不再做其它方面的格式处理。

 

0x01 测试

192.168.214.138: 安装 logstash,confluent 环境

192.168.214.137: 安装 ELK 套件(停用 logstash,只启动 es 和 kibana)

confluent 安装调试备忘:

      1. 像安装 elk 环境一样,安装 java 环境先
      2. 首先在不考虑 kafka 的情形下,实现 F5 HSL—Logstash–ES 的正常运行,并实现简单的正常 kibana 的展现。后面改用 kafka 时候直接将这里 output 修改为 kafka plugin 配置即可。
        此时 logstash 的相关配置
input {
  udp {
    port => 8514
    type => ''f5-dns''
  }
}

filter {
 if [type] == ''f5-dns'' {
    grok {
          match => { "message" => "%{HOSTNAME:F5hostname} %{IP:clientip} %{POSINT:clientport} %{IP:svrip} %{NUMBER:qid} %{HOSTNAME:qname} %{GREEDYDA
TA:qtype} %{GREEDYDATA:status} %{GREEDYDATA:origin}" }
    }
    geoip {
         source => "clientip"
         target => "geoip"
    }
    }
}



output {
   #stdout{ codec => rubydebug }
   #elasticsearch {
   # hosts => ["192.168.214.137:9200"]
   # index => "f5-dns-%{+YYYY.MM.dd}"
   #template_name => "f5-dns"
  #}
  kafka {
    codec => json
    bootstrap_servers => "localhost:9092"
    topic_id => "f5-dns-kafka"
  }
}

发一些测试流量,确认 es 正常收到数据,查看 cerebro 上显示的状态。(截图是调试完毕后截图)

# cd /usr/share/cerebro/cerebro-0.8.1/
# /bin/cerebro -Dhttp.port=9110 -Dhttp.address=0.0.0.0

安装 confluent,由于是测试环境,直接 confluent 官方网站下载压缩包,解压后使用。位置在 /root/confluent-4.1.1 / 下

由于是测试环境,直接用 confluent 的命令行来启动所有相关服务,发现 kakfa 启动失败

[root@kafka-logstash bin]# ./confluent start
Using CONFLUENT_CURRENT: /tmp/confluent.dA0KYIWj
Starting zookeeper
zookeeper is [UP]
Starting kafka
/Kafka failed to start
kafka is [DOWN]
Cannot start Schema Registry, Kafka Server is not running. Check your deployment

检查发现由于虚机内存给太少了,导致 java 无法分配足够内存给 kafka

[root@kafka-logstash bin]# ./kafka-server-start ../etc/kafka/server.properties
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error=''Cannot allocate memory'' (errno=12)

扩大虚拟机内存,并将 logstash 的 jvm 配置中设置的内存调小

kafka server 配置文件

[root@kafka-logstash kafka]# pwd
/root/confluent-4.1.1/etc/kafka
[root@kafka-logstash kafka]# egrep -v "^#|^$" server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=true
confluent.support.customer.id=anonymous
group.initial.rebalance.delay.ms=0

connect 配置文件,此配置中,将原来的 avro converter 替换成了 json,同时关闭了 key vlaue 的 schema 识别。因为我们输入的内容是直接的 json 类容,没有相关 schema,这里只是希望 kafka 原样解析 logstash 输出的 json 内容到 es

[root@kafka-logstash kafka]# pwd
/root/confluent-4.1.1/etc/kafka
[root@kafka-logstash kafka]# egrep -v "^#|^$" connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=share/java

如果不做上述修改,connect 总会在将日志 sink 到 ES 时提示无法反序列化,magic byte 错误等。如果使用 confluent status 命令查看,会发现 connect 会从 up 变为 down

[root@kafka-logstash confluent-4.1.1]# ./bin/confluent status
ksql-server is [DOWN]
connect is [DOWN]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

schema-registry 相关配置

[root@kafka-logstash schema-registry]# pwd
/root/confluent-4.1.1/etc/schema-registry
[root@kafka-logstash schema-registry]# egrep -v "^#|^$"
connect-avro-distributed.properties  connect-avro-standalone.properties   log4j.properties                     schema-registry.properties
[root@kafka-logstash schema-registry]# egrep -v "^#|^$" connect-avro-standalone.properties
bootstrap.servers=localhost:9092
key.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.url=http://localhost:8081
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java
[root@kafka-logstash schema-registry]# egrep -v "^#|^$" schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=localhost:2181
kafkastore.topic=_schemas
debug=false

es-connector 的配置文件

[root@kafka-logstash kafka-connect-elasticsearch]# pwd
/root/confluent-4.1.1/etc/kafka-connect-elasticsearch
[root@kafka-logstash kafka-connect-elasticsearch]# egrep -v "^#|^$" quickstart-elasticsearch.properties
name=f5-dns
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=f5-dns-kafka
key.ignore=true
value.ignore=true
schema.ignore=true
connection.url=http://192.168.214.137:9200
type.name=doc
transforms=MyRouter
transforms.MyRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.MyRouter.topic.format=${topic}-${timestamp}
transforms.MyRouter.timestamp.format=yyyyMMdd

上述配置中 topics 配置是希望传输到 ES 的 topic,通过设置 transform 的 timestamp router 来实现将 topic 按天动态映射为 ES 中的 index,这样可以让 ES 每天产生一个 index。注意需要配置 schema.ignore=true, 否则 kafka 无法将受收到的数据发送到 ES 上,connect 的 connect.stdout 日志会显示:

[root@kafka-logstash connect]# pwd
/tmp/confluent.dA0KYIWj/connect

Caused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
	at io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84)
	at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:221)
	at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66)
	at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
	at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)

配置修正完毕后,向 logstash 发送数据,发现日志已经可以正常发送到了 ES 上,且格式和没有 kafka 时是一致的。
没有 kafka 时:

{
	"_index": "f5-dns-2018.06.26",
	"_type": "doc",
	"_id": "KrddO2QBXB-i0ay0g5G9",
	"_version": 1,
	"_score": 1,
	"_source": {
		"message": "localhost.lan 202.202.102.100 53777 172.16.199.136 42487 www.test.com A NOERROR GTM_REWRITE ",
		"F5hostname": "localhost.lan",
		"qid": "42487",
		"clientip": "202.202.102.100",
		"geoip": {
			"region_name": "Chongqing",
			"location": {
				"lon": 106.5528,
				"lat": 29.5628
			},
			"country_code2": "CN",
			"timezone": "Asia/Shanghai",
			"country_name": "China",
			"region_code": "50",
			"continent_code": "AS",
			"city_name": "Chongqing",
			"country_code3": "CN",
			"ip": "202.202.102.100",
			"latitude": 29.5628,
			"longitude": 106.5528
		},
		"status": "NOERROR",
		"qname": "www.test.com",
		"clientport": "53777",
		"@version": "1",
		"@timestamp": "2018-06-26T09:12:21.585Z",
		"host": "192.168.214.1",
		"type": "f5-dns",
		"qtype": "A",
		"origin": "GTM_REWRITE ",
		"svrip": "172.16.199.136"
	}
}

有 kafka 时:

{
	"_index": "f5-dns-kafka-20180628",
	"_type": "doc",
	"_id": "f5-dns-kafka-20180628+0+23",
	"_version": 1,
	"_score": 1,
	"_source": {
		"F5hostname": "localhost.lan",
		"geoip": {
			"city_name": "Chongqing",
			"timezone": "Asia/Shanghai",
			"ip": "202.202.100.100",
			"latitude": 29.5628,
			"country_name": "China",
			"country_code2": "CN",
			"continent_code": "AS",
			"country_code3": "CN",
			"region_name": "Chongqing",
			"location": {
				"lon": 106.5528,
				"lat": 29.5628
			},
			"region_code": "50",
			"longitude": 106.5528
		},
		"qtype": "A",
		"origin": "DNSX ",
		"type": "f5-dns",
		"message": "localhost.lan 202.202.100.100 53777 172.16.199.136 42487 www.myf5.net A NOERROR DNSX ",
		"qid": "42487",
		"clientport": "53777",
		"@timestamp": "2018-06-28T09:05:20.594Z",
		"clientip": "202.202.100.100",
		"qname": "www.myf5.net",
		"host": "192.168.214.1",
		"@version": "1",
		"svrip": "172.16.199.136",
		"status": "NOERROR"
	}
}

相关 REST API 输出

http://192.168.214.138:8083/connectors/elasticsearch-sink/tasks

[
  {
    "id": {
      "connector": "elasticsearch-sink",
      "task": 0
    },
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "type.name": "doc",
      "value.ignore": "true",
      "tasks.max": "1",
      "topics": "f5-dns-kafka",
      "transforms.MyRouter.topic.format": "${topic}-${timestamp}",
      "transforms": "MyRouter",
      "key.ignore": "true",
      "schema.ignore": "true",
      "transforms.MyRouter.timestamp.format": "yyyyMMdd",
      "task.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkTask",
      "name": "elasticsearch-sink",
      "connection.url": "http://192.168.214.137:9200",
      "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter"
    }
  }
]

http://192.168.214.138:8083/connectors/elasticsearch-sink/
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type.name": "doc",
    "value.ignore": "true",
    "tasks.max": "1",
    "topics": "f5-dns-kafka",
    "transforms.MyRouter.topic.format": "${topic}-${timestamp}",
    "transforms": "MyRouter",
    "key.ignore": "true",
    "schema.ignore": "true",
    "transforms.MyRouter.timestamp.format": "yyyyMMdd",
    "name": "elasticsearch-sink",
    "connection.url": "http://192.168.214.137:9200",
    "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter"
  },
  "tasks": [
    {
      "connector": "elasticsearch-sink",
      "task": 0
    }
  ],
  "type": "sink"
}

http://192.168.214.138:8083/connectors/elasticsearch-sink/status
{
  "name": "elasticsearch-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.16.150.179:8083"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "172.16.150.179:8083"
    }
  ],
  "type": "sink"
}
http://192.168.214.138:8082/brokers
{
  "brokers": [
    0
  ]
}

http://192.168.214.138:8082/topics
[
  "__confluent.support.metrics",
  "_confluent-ksql-default__command_topic",
  "_schemas",
  "connect-configs",
  "connect-offsets",
  "connect-statuses",
  "f5-dns-2018.06",
  "f5-dns-2018.06.27",
  "f5-dns-kafka",
  "test-elasticsearch-sink"
]



http://192.168.214.138:8082/topics/f5-dns-kafka
{
  "name": "f5-dns-kafka",
  "configs": {
    "file.delete.delay.ms": "60000",
    "segment.ms": "604800000",
    "min.compaction.lag.ms": "0",
    "retention.bytes": "-1",
    "segment.index.bytes": "10485760",
    "cleanup.policy": "delete",
    "follower.replication.throttled.replicas": "",
    "message.timestamp.difference.max.ms": "9223372036854775807",
    "segment.jitter.ms": "0",
    "preallocate": "false",
    "segment.bytes": "1073741824",
    "message.timestamp.type": "CreateTime",
    "message.format.version": "1.1-IV0",
    "max.message.bytes": "1000012",
    "unclean.leader.election.enable": "false",
    "retention.ms": "604800000",
    "flush.ms": "9223372036854775807",
    "delete.retention.ms": "86400000",
    "leader.replication.throttled.replicas": "",
    "min.insync.replicas": "1",
    "flush.messages": "9223372036854775807",
    "compression.type": "producer",
    "min.cleanable.dirty.ratio": "0.5",
    "index.interval.bytes": "4096"
  },
  "partitions": [
    {
      "partition": 0,
      "leader": 0,
      "replicas": [
        {
          "broker": 0,
          "leader": true,
          "in_sync": true
        }
      ]
    }
  ]
}

测试中 kafka 的配置基本都为确实配置,没有考虑任何的内存优化,kafka 使用磁盘的大小考虑等

测试参考:

https://docs.confluent.io/current/installation/installing_cp.html

https://docs.confluent.io/current/connect/connect-elasticsearch/docs/elasticsearch_connector.html

https://docs.confluent.io/current/connect/connect-elasticsearch/docs/configuration_options.html

存储机制参考 https://blog.csdn.net/opensure/article/details/46048589

kafka 配置参数参考 https://blog.csdn.net/lizhitao/article/details/25667831

更多 kafka 原理 https://blog.csdn.net/ychenfeng/article/details/74980531

confluent CLI:

confluent: A command line interface to manage Confluent services

Usage: confluent <command> [<subcommand>] [<parameters>]

These are the available commands:

    acl         Specify acl for a service.
    config      Configure a connector.
    current     Get the path of the data and logs of the services managed by the current confluent run.
    destroy     Delete the data and logs of the current confluent run.
    list        List available services.
    load        Load a connector.
    log         Read or tail the log of a service.
    start       Start all services or a specific service along with its dependencies
    status      Get the status of all services or the status of a specific service along with its dependencies.
    stop        Stop all services or a specific service along with the services depending on it.
    top         Track resource usage of a service.
    unload      Unload a connector.

''confluent help'' lists available commands. See ''confluent help <command>'' to read about a
specific command.

confluent platform 服务端口表

 

 

参考

 

Centos7+kafka+ELK6.5.x安装搭建

Centos7+kafka+ELK6.5.x安装搭建

 

Centos7+kafka+ELK6.5.x安装搭建

1   数据的流向

数据源端使用logstash收集udp 514日志输出到kafka中做测试(可选择filebeat,比较轻量级、对CPU负载不大、不占用太多资源;选择rsyslog对 kafka的支持是 v8.7.0版本及之后版本)。如下流程:

logstash(udp 514) => kafka(zookeeper) 集群=> logstash(grok) => elasticsearch集群 => kibana

Logstash(grok)因每条日志需做正则匹配比较消耗资源,所以中间加了kafka(zookeeper)集群做消息队列。

2  数据源端配置

2.1  安装Logstash(udp 514)

1)  安装jdk

yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

2)  yum安装logstash

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

 

cat >> /etc/yum.repos.d/logstash-6.x.repo << ''EOF''

[logstash-6.x]

name=Elastic repository for 6.x packages

baseurl=https://artifacts.elastic.co/packages/6.x/yum

gpgcheck=1

gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch

enabled=1

autorefresh=1

type=rpm-md

EOF

 

yum install -y logstash

3)  配置

修改内存大小

vim /etc/logstash/jvm.options

-Xms2g

-Xmx2g

 

cp /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash514.conf

vim /etc/logstash/conf.d/logstash514.conf

#############################

input {

  syslog {

    type => "syslog"

    port => "514"

  }

}

output {

 kafka {

  codec => "json"     #一定要加上,不然输出没有host等字段

  bootstrap_servers => "192.168.89.11:9092,192.168.89.12:9092,192.168.89.13:9092"

  topic_id => "SyslogTopic"

      }

 }

#############################

 

4)  测试配置

/usr/share/logstash/bin/logstash --path.settings /etc/logstash/ -t

-t: 测试配置文件

--path.settings: 单独测试需要指定配置文件路径,否则会找不到配置

 

5)  启动logstash并设置开机启动(配置好zookeeper+kafka再启动)

Logstash监听小于1024的端口号使用logstash权限启动会失败,需要修改root权限启动

vim /etc/systemd/system/logstash.service    #修改以下两项

##################

User=root

Group=root

##################

启动服务

systemctl daemon-reload

systemctl start logstash

systemctl enable logstash

 

6)  可以使用logger生成日志

例:logger -n 服务器IP  "日志信息"

3  收集端配置

3.1  安装zookeeper+kafka集群

请看https://www.cnblogs.com/longBlogs/p/10340251.html

 

4  安装logstash(grok)并配置重要数据写入mysql数据库

需要安装logstash-output-jdbc输出到mysql数据库(请看https://www.cnblogs.com/longBlogs/p/10340252.html)

4.1  安装logstash

安装步骤请参考“数据源端配置”-“安装Logstash(udp 514)”上的安装步骤

4.2  配置

1)设置日志匹配目录和格式

mkdir -p /data/logstash/patterns   # 创建额外格式目录

自定义匹配类型

vim /data/logstash/patterns/logstash_grok

============================================

#num formats

num [1][0-9]{9,}    #匹配1开头至少10位的数字

=================================================

 

2)配置logstash(grok):

修改内存大小

vim /etc/logstash/jvm.options

-Xms2g

-Xmx2g

 

配置conf文件

cp /etc/logstash/logstash-sample.conf /etc/logstash/conf.d/logstash_syslog.conf

vim /etc/logstash/conf.d/logstash514.conf

#############################

input {

  kafka {

  bootstrap_servers => "192.168.89.11:9092,192.168.89.12:9092,192.168.89.13:9092"

  topics => "SyslogTopic"

  codec => "json"   #一定要加上,不然输出没有host等字段

  group_id => "logstash_kafka"  #多个logstash消费要相同group_id,不然会重复消费

  client_id => "logstash00"     #client_id唯一

  consumer_threads => 3   #线程数

  }

}

filter {

    grok {

        patterns_dir => ["/data/logstash/patterns/"]

        match => {

            "message" => ".*?%{NUM:num}.*?"

        }  

}

}

#输出到elasticsearch

#这里不输出到mysql ,把输出到mysql注释掉

output {

  elasticsearch {

    hosts => ["192.168.89.20:9200","192.168.89.21:9200"] #是集群可写多个

    index => "log-%{+YYYY.MM.dd}" # 按日期分index,-前面必须小写

  }

#jdbc {

             #driver_jar_path => "/etc/logstash/jdbc/mysql-connector-java-5.1.47/mysql-connector-java-5.1.47-bin.jar"

             #driver_class => "com.mysql.jdbc.Driver"

             #connection_string => "jdbc:mysql://mysql服务器ip:端口/数据库?user=数据库用户名&password=数据库密码"

             #statement => [ "insert into 数据表 (TIME ,IP,MESSAGES) values (?,?,?)","%{@timestamp}" ,"%{host}","%{message}" ]

  #}

}

################################################

 

3)启动logstash并设置开机启动(配置好elasticsearch再启动)

如果Logstash监听小于1024的端口号使用logstash权限启动会失败,需要修改root权限启动

vim /etc/systemd/system/logstash.service    #修改以下两项

##################

User=root

Group=root

##################

启动服务

systemctl daemon-reload

systemctl start logstash

systemctl enable logstash

5  elasticsearch数据库端配置

5.1 安装elasticsearch(192.168.89.20与192.168.89.21)

1)  安装java环境

yum install –y  java-1.8.0-openjdk java-1.8.0-openjdk-devel

2)  设置yum源

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

 

cat >> /etc/yum.repos.d/elasticsearch-6.x.repo << ''EOF''

[elasticsearch-6.x]

name=Elasticsearch repository for 6.x packages

baseurl=https://artifacts.elastic.co/packages/6.x/yum

gpgcheck=1

gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch

enabled=1

autorefresh=1

type=rpm-md

EOF

3)  安装

yum install -y elasticsearch

4)  配置

vim /etc/elasticsearch/elasticsearch.yml

cluster.name: syslog_elasticsearch   # 集群名称,同一集群需要一致

node.name: es_89.20   # 节点名称,同一集群不同主机不能一致

path.data: /data/elasticsearch  # 数据存放目录

path.logs: /data/elasticsearch/log  # 日志存放目录

network.host: 0.0.0.0   # 绑定ip

discovery.zen.ping.unicast.hosts: ["192.168.89.20", "192.168.89.21"]  # 集群成员,不指定host.master则是自由选举

#discovery.zen.minimum_master_nodes: 1   # 这个参数默认值1,控制的是,一个节点需要看到的具有master节点资格的最小数量,然后才能在集群中做操作。官方的推荐值是(N/2)+1,其中N是具有master资格的节点的数量(情况是3,这个参数设置为2,但对于只有2个节点的情况,设置为2就有些问题了,一个节点DOWN掉后,你肯定连不上2台服务器了,这点需要注意)。

 

修改了elasticsearch.yml的data、log等目录,请在这里也修改

#vim /usr/lib/systemd/system/elasticsearch.service

 LimitMEMLOCK=infinity

 

#systemctl daemon-reload

 

5)  创建目录并属主、属组为一个非root账户,否则启动会有以下错误

mkdir -p /data/elasticsearch/log

 

出现报错:

main ERROR Unable to create file /data/log/elasticsearch/syslog_elasticsearch.log java.io.IOException: Permission denied

 

将ElasticSearch的安装目录及其子目录改为另外一个非root账户

chown -R elasticsearch:elasticsearch /data/elasticsearch/

 

6)  启动

systemctl restart elasticsearch

systemctl enable elasticsearch

 

7)  测试

  • 查询集群状态方法1

curl -XGET ''http://192.168.89.20:9200/_cat/nodes''

后面添加?v代表详细

curl -XGET ''http://192.168.89.20:9200/_cat/nodes?v''

  •  查询集群状态方法2

curl -XGET ''http://192.168.89.20:9200/_cluster/state/nodes?pretty''

  • 查询集群健康状况

6  分析展示端配置

6.1  安装kibana

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

 

cat >> /etc/yum.repos.d/kibana-6.x.repo << ''EOF''

[kibana-6.x]

name=Kibana repository for 6.x packages

baseurl=https://artifacts.elastic.co/packages/6.x/yum

gpgcheck=1

gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch

enabled=1

autorefresh=1

type=rpm-md

EOF

 

yum install -y kibana

 

配置文件

cat /etc/kibana/kibana.yml |egrep -v "^#|^$"

###############################################

server.port: 5601     #要使用5601端口,使用nginx转80端口访问

server.host: "0.0.0.0"

elasticsearch.url: "http://192.168.89.20:9200"

###############################################

 

 

systemctl start kibana

systemctl enable kibana

 

访问

http://192.168.89.15:5601

 

汉化

先停止服务

systemctl stop kibana

 

做汉化

github上有汉化的项目,地址:https://github.com/anbai-inc/Kibana_Hanization

yum install unzip

解压在kibana的安装目录

unzip Kibana_Hanization-master.zip

cd Kibana_Hanization-master

python main.py kibana安装目录(可以python main.py /

 

启动服务

systemctl start kibana

6.2  安装nginx

nginx主要作用是添加登录密码,因为kibana并没有登录功能,除非es开启密码。

1) 安装

rpm -ivh http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm

 

yum install -y nginx

安装Apache密码生产工具:

yum install httpd-tools

2)  配置

生成密码文件: 

mkdir -p /etc/nginx/passwd

htpasswd -c -b /etc/nginx/passwd/kibana.passwd admin admin

cp /etc/nginx/conf.d/default.conf /etc/nginx/conf.d/default.conf.backup

 vim /etc/nginx/conf.d/default.conf

####################################

server {

    listen       192.168.89.15:80;

    server_name  localhost;

    auth_basic "Kibana Auth";

    auth_basic_user_file /etc/nginx/passwd/kibana.passwd;

    location / {

        root   /usr/share/nginx/html;

        index  index.html index.htm;

        proxy_pass http://127.0.0.1:5601;

        proxy_redirect off;

    }

    error_page   500 502 503 504  /50x.html;

    location = /50x.html {

        root   /usr/share/nginx/html;

    }

}

##################################

修改Kibana配置文件:

vim /etc/kibana/kibana.yml

server.host: "localhost"

 

重启服务

systemctl restart kibana

systemctl restart nginx

systemctl enable nginx

 

访问

http://192.168.89.15:80

 

Confluent kafka rest 实战

Confluent kafka rest 实战

Confluent platform 是个什么东西?

    是由 LinkedIn 开发出 Apache Kafka 的团队成员,基于这项技术创立了新公司 Confluent,Confluent 的产品也是围绕着 Kafka 做的。基本架构如下:

 

可以免费使用的组件:

    Confluent Kafka Brokers (开源)
    Confluent Kafka Connectors(开源)
    Confluent Kafka Clients(开源)
    Confluent Kafka REST Proxy(开源)
    Confluent Schema Registry(开源)

我们的关注:

    本次我们主要使用 REST Proxy,当然底层的 broker 也是使用 confluent 的 kafka 组件。

    实验平台:CentOS release 6.7 (Final)

    kafka 版本:confluent-kafka-2.11-0.10.1.0-1

    rest proxy 版本:confluent-kafka-rest-3.1.1-1

添加 Yum 仓库:

    本地添加 confluent 的 repo 仓库即可

[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.1/6
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.1
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1

安装:

yum clean all
yum makecache
yum install confluent-kafka confluent-kafka-rest -y

配置:

    zookeeper:/etc/kafka/zookeeper.properties

dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0

    kafka broker:/etc/kafka/server.properties

broker.id=50
delete.topic.enable=true
listeners=PLAINTEXT://10.205.51.50:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.205.51.50:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=true
confluent.support.customer.id=anonymous

    rest proxy:/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-server
zookeeper.connect=10.205.51.50:2181

   schema registry:/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.205.51.50:2181
kafkastore.topic=_schemas
debug=false

启动:

    启动 zookeeper:

zookeeper-server-start -daemon /etc/kafka/zookeeper.properties

   

    启动 kafka broker

kafka-server-start -daemon /etc/kafka/server.properties

   

    启动 rest proxy

kafka-rest-start -daemon /etc/kafka-rest/kafka-rest.properties

    启动 schema registry

schema-registry-start -daemon /etc/schema-registry/schema-registry.properties

实验过程:

    rest proxy 支持 avro、json、binary 数据格式,本文以 avro、json 格式为例进行实战。

    查看当前 topics:

curl http://10.205.51.50:8082/topics

   

    查看集群的 brokers:

curl http://10.205.51.50:8082/brokers

    

    创建 topic test2,存放 avro 格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --partitions 1 --replication-factor 1 --topic test2

    通过 rest 接口向 test2 push 数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data ''{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"username\", \"type\": \"string\"}]}","records": [{"value": {"username": "testUser"}},{"value": {"username": "testUser2"}}]}'' http://10.205.51.50:8082/topics/test2

    注册 consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data ''{"format": "avro", "auto.offset.reset": "smallest"}'' http://10.205.51.50:8082/consumers/my_avro_consumer

    通过 rest 接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a/topics/test2

    删除注册的 consumer 实例:

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a

    创建 topic test3,存放 json 格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --topic test3 --replication-factor 1 --partitions 1

    通过 rest 接口向 test3 push 数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data ''{"records": [{"key": "somekey","value": {"foo": "bar"}},{"value": [ "foo", "bar" ],"partition": 0}]}'' http://10.205.51.50:8082/topics/test3

    注册 consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data ''{"name": "test3","format": "json", "auto.offset.reset": "smallest"}'' http://10.205.51.50:8082/consumers/my_json_consumer

    通过 rest 接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.json.v1+json" http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3/topics/test3

    删除 consumer 实例

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3

 

    可以看到整个过程还是比较麻烦的,依赖多个服务。

 

 

DataPipeline联合Confluent Kafka Meetup上海站

DataPipeline联合Confluent Kafka Meetup上海站

Confluent作为国际数据“流”处理技术领先者,提供实时数据处理解决方案,在市场上拥有大量企业客户,帮助企业轻松访问各类数据。DataPipeline作为国内首家原生支持Kafka解决方案的“iPaaS+AI”一站式大数据融合服务提供商,在零售、金融、互联网和制造等行业拥有着丰富实践经验和解决方案能力。

此次上海DataPipeline & Confluent Kafka Meetup,我们邀请到了Confluent流数据处理系统架构师和技术负责人王国璋、DataPipeline架构师吕鹏、携程大数据平台实时计算平台负责人潘国庆、阿里巴巴实时计算平台高级技术专家秦江杰、网易云大数据平台架构师欧阳武林等大数据领域六位行业大咖分享Kafka最新研究成果以及行业应用案例。

 

活动流程

DataPipeline联合ConfluentKafkaMeetup上海站

 

演讲嘉宾

 

DataPipeline联合ConfluentKafkaMeetup上海站

王国璋

Confluent流数据处理系统架构师和技术负责人

Apache Kafka PMC,Kafka Streams 作者。于康奈尔大学计算机系取得博士学位,主要研究方向为数据库管理和分布式数据系统。现就职于Confluent,任流数据处理系统架构师和技术负责人。此前曾就职于 LinkedIn 数据架构组,主要负责实时数据处理平台,包括 Apache Kafka 和 Apache Samza 系统的开发与维护。

演讲主题:《Apache Kafka,从0.7到2.0:那些年我们踩过的坑》

自从 2011年被捐献给Apache基金会到现在,Kafka项目已经走过了七个年头。从最早的“分布式消息系统”,到现在集成了分发、存储和计算的“流式数据平台”,Kafka经历了哪些挑战?又经过了什么样的演进变化?

1. 从硬件的发展趋势,展现 Kafka 架构的演进过程;

2. 从Kafka开发和维护经验,分享分布式系统工程实践的通理;

3. 开源数据系统的开发经验,如何维护和发展一个开源社区。

 

DataPipeline联合ConfluentKafkaMeetup上海站

潘国庆

携程大数据平台实时计算平台负责人

携程大数据平台实时计算平台负责人,2016年加入携程,主要从事携程实时计算平台的构建与演进、以及携程实时特征平台的搭建,在实时计算领域拥有丰富的实战经验。

演讲主题:《携程实时计算平台架构与实践》

1. 携程实时计算平台的演进之路;

2. 实时架构设计与曾经踩过的坑;

3. 实时计算在携程的实践;

4. 未来规划。

 

DataPipeline联合ConfluentKafkaMeetup上海站

吕鹏

DataPipeline 架构师

吕鹏,目前担任DataPipeline架构师一职,负责数据传输和ETL方面的架构和优化工作。曾就职于Talend和销售易大数据部,拥有5余年大数据平台和数据仓库架构搭建经验,在ETL 领域拥有丰富的实战经验。

演讲主题:《DataPipeline在大数据平台的实时数据流实践》

1. 大数据时代下企业级数据面临的主要问题和挑战;

2. kafka connect 在大数据时代的数据流的优势和不足;

3. 大数据平台上的kafka connect 实践案例:

1)kafka connect 下数据仓库greenplum 同步实践以及优化策略;

2)kafka connect 下 hive 同步实践;

4. DataPipeline 做了哪些大数据相关工作,遇到的坑以及相应的解决方案;

5. 总结以及DataPipeline 未来在大数据领域的相关工作。

 

DataPipeline联合ConfluentKafkaMeetup上海站

张勇华

唯品会消息平台架构师

毕业于武汉大学,目前担任唯品会消息平台架构师。从事消息系统的设计和研发有4年多的时间,对Kafka、RabbitMQ、RocketMQ等消息中间件产品拥有较丰富的实践经验。

演讲主题:《基于Kafka1.0建立企业级消息系统平台的思考及实现》

1. 随着业务系统接入增多,对集群和元数据资源(Topic/Group)的管理显得尤为重要,急需建立统一平台来管理系统资源及认证授权接入服务;

2. 提升并发消费场景下的高可靠,及消费异常后如何优雅解决消费重试问题;

3. 如何实现延时消息的投递功能,改善延时精度的方式;

4. 原生kafka支持组间广播对动态增长的业务组申请造成困境,如何实现组内广播来改善现状;

5. 集群故障时如何快速有效迁移服务等问题介绍。

 

DataPipeline联合ConfluentKafkaMeetup上海站

秦江杰

阿里巴巴实时计算平台高级技术专家

秦江杰,阿里巴巴实时计算平台高级技术专家。硕士毕业于卡耐基梅隆大学,曾任职于LinkedIn参与Apache Kafka开发,Apache Kafka PMC member。

演讲主题:《详解Flink基于Kafka事务的严格一次语义实现》

在流处理系统中,端到端的严格一次的语义只有当输出的Sink支持时才可以实现。在Kafka 0.11中增加了对事物的支持,使得其他系统可以利用这一特性实现严格一次的语义。本次分享将详细介绍Flink是如何利用这Kafka一特性来实现端到端的严格一次语义的。

 

DataPipeline联合ConfluentKafkaMeetup上海站

欧阳武林

网易大数据平台架构师

曾就职于中国移动、51信用卡,主要从事云计算平台开发相关工作。目前就职于网易,主要从事大数据平台开发。

演讲主题:《Kafka 在网易云上的实践》

1.Kafka上云面临的挑战和机遇;

2.网易云Kafka架构实现;

3.网易云Kafka的关键问题解决之法;

4.未来规划。

 

时间&地点

时间:2018年10月21日,13:30签到

详细地址:上海市长宁区金钟路968号凌空SOHO12号楼11层

 

活动报名

添加DataPipeline君微信:datapipeline2018,拉你进现场活动群。

ELK - Fluentd 日志收集(官方文档 部署安装 配置文件 详解)

ELK - Fluentd 日志收集(官方文档 部署安装 配置文件 详解)

官网地址:

1

https://www.fluentd.org/

下载地址:

1

https://www.fluentd.org/download

Fluentd文档地址:

1

https://docs.fluentd.org/installation

Fluentd Bit文档地址:

1

https://docs.fluentbit.io/manual/

 

Fluentd 和 Fluent Bit:

        Fluentd 和 Fluent Bit 的区别在于Fluent Bit 适用于对资源需求非常敏感的情况下且没有依赖,更节省资源只要450KB的内存就可运行,缺点是插件少,只负责收集和转发。Fluentd 内存需求约为40M,拥有更丰富的插件,支持大吞吐量,多输入并路由到不同的输出。

 

可以查考部署的yaml文件:

fluentd-es-ds.yaml

1

https://github.com/kubernetes/kubernetes/blob/master/cluster/addons/fluentd-elasticsearch/fluentd-es-ds.yaml

fluent-bit-ds.yaml

1

https://github.com/fluent/fluent-bit-kubernetes-logging/blob/0.13-dev/output/kafka/fluent-bit-ds.yaml

安装前配置:

文件描述符:

1

2

3

4

5

6

cat >> /etc/security/limits <<EOF

root soft nofile 65536

root hard nofile 65536

* soft nofile 65536

* hard nofile 65536

EOF

查看:

1

ulimit -n

sysctl.conf

1

2

3

cat >> /etc/sysctl.conf <<EOF

vm.max_map_count=655360

EOF

查看:

1

sysctl -p

 

安装Fluentd:

导入gpg key:

1

rpm --import https://packages.treasuredata.com/GPG-KEY-td-agent

配置yum源:

1

2

3

4

5

6

7

cat /etc/yum.repos.d/td.repo <<EOF

[treasuredata]

name=TreasureData

baseurl=http://packages.treasuredata.com/3/redhat/\$releasever/\$basearch

gpgcheck=1

gpgkey=https://packages.treasuredata.com/GPG-KEY-td-agent

EOF

安装:

1

yum install -y td-agent

启动:

1

systemctl start td-agent.service

测试:

1

2

3

curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test

 

tail -f /var/log/td-agent/td-agent.log

配置:

source:定义输入,数据的来源,input方向。

match:定义输出,下一步的去向,如写入文件,或者发送到指定软件存储。output方向。

filter:定义过滤,也即事件处理流水线,一般在输入和输出之间运行,可减少字段,也可丰富信息。

system:系统级别的设置,如日志级别、进程名称。

label:定义一组操作,从而实现复用和内部路由。

@include:引入其他文件,和Java、python的import类似。

输出调试:

1

2

3

4

5

6

7

8

9

10

11

<source>

  @type tail

  path /var/log/Nginx/access.log

  pos_file /var/log/td-agent/Nginx-access.log.pos

  tag Nginx.access

  format Nginx

</source>

 

<match Nginx.access>

  @type stdout

</match>

收集Nginx日志配置:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

<source>

  @type tail

  path /var/log/Nginx/access.log #...or where you placed your Apache access log

  pos_file /var/log/td-agent/Nginx-access.log.pos # This is where you record file position

  tag Nginx.access

  format Nginx

</source>

<source>

  @type tail

  path /var/log/Nginx/error.log

  pos_file /var/log/td-agent/Nginx-error.log.pos

  tag Nginx.error

  format /^(?<time>[^ ]+ [^ ]+) \[(?<log_level>.*)\] (?<pid>\d*).(?<tid>[^:]*): (?<message>.*)$/

</source>

  

<match Nginx.*>

 @type elasticsearch

 logstash_format true

 host 172.31.18.133 # elasticsearch IP

 port 9200

 index_name fluentd-Nginx

 type_name fluentd-Nginx

</match>

 

检查配置文件:

1

td-agent --dry-run -c td-agent.conf

 

tag

 

tag的用作:Fluentd 内部路由引擎的方向。

tag的使用:<match tag>、<filter tag>,如:<match Nginx.web>、<filter Nginx.*>

语法:

*:匹配单个 tag 部分

        例:a.*,匹配 a.b,但不匹配 a 或者 a.b.c

 

**:匹配 0 或 多个 tag 部分

        例:a.**,匹配 a、a.b 和 a.b.c

 

{X,Y,Z}:匹配 X、Y 或 Z,其中 X、Y 和 Z 是匹配模式。可以和 * 和 ** 模式组合使用

        例 1:{a, b},匹配 a 和 b,但不匹配 c

        例 2:a.{b,c}. 和 a.{b,c.*}

 

多模式:当多个模式列在一个 <match> 标签(由一个或多个空格分隔)内时,它匹配任何列出的模式。 

        例如:

        <match a b>:匹配 a 和 b

        <match a.** b.*>:匹配 a、a.b、a.b.c 和 b.d

 

 

过滤插件

 

record_transformer:

        将收集来的信息通过增删改的方式来处理,也就是说这个插件可以将收集来的信息往里面增加字段、删除字段、修改字段。这个插件内核中自带不用安装。

增加字段:下面这段配置中向原有的字段中增加了hostname 和 tag 两个字段。

1

2

3

4

5

6

7

<filter foo.bar>

  @type record_transformer

  <record>

    hostname "#{Socket.gethostname}" # ruby的语法

    tag ${tag}

  </record>

</filter>

由:

1

{"message":"hello world!"}

变为:

1

{"message":"hello world!""hostname":"db001.internal.example.com""tag":"foo.bar"}

用法二:可在配置里面搞点计算,使用Ruby的语法。

1

2

3

4

5

6

7

<filter foo.bar>

  @type record_transformer

  enable_ruby

  <record>

    avg ${record["total"] / record["count"]}

  </record>

</filter>

由:

1

{"total":100, "count":10}

变为:

1

{"total":100, "count":10, "avg":"10"}

用法三:将tag中的第二部分值加入字段中。

1

2

3

4

5

6

<filter web.*> # *号匹配到的值

  @type record_transformer

  <record>

    service_name ${tag_parts[1]}  # 将*号匹配到的值放在这个字段中

  </record>

</filter>

由:

1

{"user_id":1, "status":"ok"}

变为:

1

{"user_id":1, "status":"ok""service_name":"auth"}

基本使用语法:

如果输入的记录为:{"total":100, "count":10}

我们可以在<record>标签里面使用 record["total"]=100,record["count"]=10 来取值。还有其他的值time、hostname等。

<record>

    field: ${record["total"] / record["count"]}

    tag: ${tag}

    hostname "#{Socket.gethostname}"

</record>

<record>里面的键值必须是新的字段。

标签tag的用法:

tag_parts[N]    标签的第N部分

tag_prefix[N]    从开始到N的部分

tag_suffix[N]    从N到结尾的部分

    例:标签为 debug.my.app

    tag_prefix[0] = debug

    tag_prefix[1] = debug.my

    tag_prefix[2] = debug.my.app

    tag_suffix[0] = debug.my.app

    tag_suffix[1] = my.app

    tag_suffix[2] = app

其他参数:record是在filter指令内的

1

2

3

4

5

6

7

8

<record>

    enable_ruby    false

    auto_typecast    false

    renew_record    false

    renew_time_key    nil

    keep_keys    nil

    remove_keys    nil

</record>

 

parser:

        解析器,解析日志,将日志格式化成json。

1

2

3

4

5

6

7

8

9

<filter foo.bar>

  @type parser

  key_name log

  <parse>

    @type regexp

    expression /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)$/

    time_format %d/%b/%Y:%H:%M:%s %z

  </parse>

</filter>

由:

1

2

3

4

time:

injested time (depends on your input)

record:

{"log":"192.168.0.1 - - [05/Feb/2018:12:00:00 +0900] \"GET / HTTP/1.1\" 200 777"}

解析为:

1

2

3

4

time

05/Feb/2018:12:00:00 +0900

record:

{"host":"192.168.0.1","user":"-","method":"GET","path":"/","code":"200","size":"777"}

在<filter>中的可用参数:详见 https://docs.fluentd.org/filter/parser

本文转自:Fluentd 日志收集 来源:原创 时间:2019-08-08 作者:脚本小站 分类:Linux

今天关于ELK 学习笔记之基于 kakfa (confluent) 搭建 ELKelk+kafka+filebeat的分享就到这里,希望大家有所收获,若想了解更多关于Centos7+kafka+ELK6.5.x安装搭建、Confluent kafka rest 实战、DataPipeline联合Confluent Kafka Meetup上海站、ELK - Fluentd 日志收集(官方文档 部署安装 配置文件 详解)等相关知识,可以在本站进行查询。

本文标签: