Filebeat5+Kafka+ELK Docker是怎么搭建日志系统

发布时间:2021-12-08 15:28:51 作者:柒染
来源:亿速云 阅读:141

这篇文章将为大家详细讲解有关Filebeat5+Kafka+ELK Docker是怎么搭建日志系统,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。


准备工作

名称版本号
Filebeat5.0.0-alpha1
Kafka0.9.0.1
Zookeeper3.4.8
Elasticsearch2.4.0
Logstash2.4.0
Kibana4.6.0

为什么使用Docker

纯粹是处于个人爱好,各种技术只要跟 Docker 搭边就倾爱它的 Docker 镜像版本。本文除了filebeat agent是二进制版本直接安装在应用机上,与docker无关,其他都是基于docker 镜像版本的集群安装。

为什么使用Kafka

分布式基于发布订阅的消息系统Kafka,它可以将业务应用端(client)和日志分析服务端(server)很好的黏合起来,并起到了缓冲作用,并提供了很多优秀特性比如异步,解耦,持久化,顺序化等。而且Kafka可以与很多开源组件Storm、Spark等集成,对于日后的扩展这一层会有很大的帮助。

为什么选择Filebeat5

本来开始确实是首选Flume,要做两件事:①上传client端的日志文件到Kafka; ②消费Kafka的队列消息存入ElasticSearch。
坑爹的是,当时最新发布的Flume版本是1.6.0, 而它支持的es版本最高只到1.7.5, 不支持2.x版本,中间对es做了各种的降级和甚至还得配合jdk8云云, 最后放弃。
选择就剩了 logstash-forwarder 和 filebeat, 而后者其实就是前者的升级版+替代品,所以直接选用filebeat无疑了。
妖怪又粗线了, filebeat当时的最新稳定版是1.3.0, 而它是不支持output到kafka的。也就是第①件事就被卡住了,幸好Beats5的alpha1测试版发布了,虽然不稳定,但是测试下来还未发现日志丢失的情况,先用着吧。
话音未落,alpha2又发布了...
https://www.elastic.co/guide/en/beats/libbeat/master/release-notes-5.0.0-alpha2.html

第②件事就通过Logstash来实现了(因为docker镜像就是elk一体的 hihahiha)

开整

问完十万个为什么之后,终于可以开整了(其实前面的#为什么#也是我的血泪史...)

一、Zookeeper 的安装

直接介绍一个不错的docker镜像,pull下来直接使用

docker pull jeygeethan/zookeeper-cluster

集群三个点上分别启动命令,虚拟卷大家自定义

docker run --name docker-zk -d --restart=always \
--net="host" \
-p 2181:2181 \
-p 2888:2888 \
-p 3888:3888 \
-v ~/dockerdata/zookeeper/lib:/var/lib/zookeeper \
-v ~/dockerdata/zookeeper/log:/var/log/zookeeper \
-v /etc/localtime:/etc/localtime:ro \
jeygeethan/zookeeper-cluster 192.168.0.1,192.168.0.2,192.168.0.3 1 {1/2/3: 三个节点分别设置}

三个节点都启动成功后,进入节点A

运行 docker exec -it docker-zk bash

默认就会进入/usr/share/zookeeper 目录,

运行 bin/zkCli.sh

进入了zk的客户端命令行,

创建节点 create /nicholas "nicholas"
查看节点 get /nicholas 显示创建成功,
    在虚机B、C上执行get操作检查下新的节点是否已同步,可见则成功。


### 二、Kafka 的安装
同样 pull 镜像先

docker pull jeygeethan/kafka-cluster

同样三个节点上分别启动,注意,我这里kafka和zk使用的是相同的三台虚机。

docker run --name docker-kafka -d -p 9092:9092
-e KAFKA_HOST=192.168.0.1
-e KAFKA_PORT=9092
-e ZOOKEEPER_CONNECT=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
-e KAFKA_ID=0 {0/1/2: 三个节点分别设置,从0开始}
-v ~/dockerdata/kafka/logs:/tmp/kafka-logs
jeygeethan/kafka-cluster

同时进入虚机A,和虚机B

进入docker
 docker exec -it docker-kafka bash
转换目录
 cd /opt/kafka_2.11-0.9.0.1/bin
创建Topic
 ./kafka-topics.sh --create --topic TP_NIC --partitions 4 --replication-factor 2
        --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
查看Topic
 ./kafka-topics.sh --describe --topic TP_NIC
         --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
在broker0(虚机A)上生产消息
 ./kafka-console-producer.sh --topic=TP_NIC \
         --broker-list=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092
在broker1(虚机B)上消费消息
 ./kafka-console-consumer.sh --topic=TP_NIC \
         --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181

到此,虚机A和B已经都关联上了TP_NIC, 在 A 命令行上,随意输入各类字符, 在 B 上可以看到同样的字符即说明消费成功了.

三、ELK 的安装

pull 镜像 2.4 版本 最新的5已经有了

docker pull sebp/elk:es240_l240_k460

修改即将要映射的虚拟卷的目录权限, 注意这里的991,992,993分别对应ELK的三个独立用户,如果你看下docker file的build脚本就明白了,为了让docker运行成功,我们先把权限配上。

chown -R 991:991 ~/dockerdata/es && chown -R 992:992 ~/dockerdata/logstash && chown -R 993:993 ~/dockerdata/kibana

进入对应的目录,我们先把配置给设定好。
注:这些配置文件是从docker里面cp出来的,如源文件没有,请先docker run启动原镜像然后docker cp拷贝。

vi ~/dockerdata/es/config/elasticsearch.yml
编辑内容如下
cluster.name: mm-cluster
node.name: mm-node-01
node.master: false
node.data: true
#restrict outside access
network.host: 192.168.0.11
transport.tcp.port: 9300
http.port: 9200
path.data: /etc/elasticsearch/data
path.work: /etc/elasticsearch/work
path.logs: /etc/elasticsearch/logs
path.plugins: /etc/elasticsearch/plugins
bootstrap.mlockall: true
discovery.zen.ping.multicat.enabled: false
discovery.zen.fd.ping_timeout: 100s
#discovery.zen.fd.ping_retries: 6
#discovery.zen.fd.ping_interval: 30s
discovery.zen.ping.timeout: 100s
discovery.zen.minimum_master_nodes: 1
discovery.zen.ping.unicast.hosts: ["192.168.0.11", "192.168.0.12", "192.168.0.13"]
gateway.recover_after_nodes: 2
#action.auto_create_index: false
index.number_of_replicas: 0
index.number_of_shards: 2
vi ~/dockerdata/kibana/config/kibana.yml
检查 elasticsearch.url: "http://localhost:9200" 对应上了即可
新增Kafka的input配置文件:
vi ~/dockerdata/logstash/config/03-kafka-input.conf
------------------------------------------
input {
   kafka {
       zk_connect => "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 "
       #group_id => ""
       topic_id => "syslog"
       codec => "plain"
       reset_beginning => false
       consumer_threads => 5
       decorate_events => true
       add_field => { "[@metadata][type]" => "syslog" }
   }
}
修改日志解析过滤配置文件:
vi ~/dockerdata/logstash/config/10-syslog.conf
------------------------------------------
filter {
 if [@metadata][type] in ["syslog","accesslog"] {
   ruby {
     code => "event['mlogsendts'] = event['@timestamp']"
   }
   mutate {
     add_field => ["mlogsendts_string", "%{@timestamp}"]
   }
   json {
     source => "message"
     add_field => {
       "mlogmsg" => "%{message}"
     }
     remove_field => [ "message"]
   }
   grok {
     patterns_dir => ["/opt/logstash/patterns"]
     match => { "mlogmsg" => "[%{MMLOGTS:mlogts}]\s[%{MMLOGWORDEXT:mlogcell}]\s[%{MMLOGWORDEXT:mlognode}]\s[%{MMLOGWORDEXT:mlogthread}]\s[%{MMLOGWORD:mloglevel}]\s[%{MMLOGWORDEXT:mlogclass}]\s%{GREEDYDATA}" }
   }
   grok {
     match => { "source" => "%{GREEDYDATA}/%{GREEDYDATA:mlogfilename}.log" }
   }
   syslog_pri { }
   date {
     match => [ "mlogts", "yyyy-MM-dd HH:mm:ss.SSS" ]
     timezone => "Asia/Shanghai"
     target => "@timestamp"
   }
 }
}
这里最复杂的其实是两件事,
① 用日志中的时间戳替换系统@timstamp(参见配置)
②grok表达式将日志中的变量分段解析(找在线grok校验工具可以验证自己的正则,很费劲!!!)
grok表达式默认支持各种格式的正则格式变量,大家自行官网搜索,这里我是自定义的一些正则变量,存放在:
vi ~/dockerdata/logstash/patterns/mmlog
.
patterns下面的文件logstash默认会自动扫描的,所以文件名随便定义,只要正则对了就可以了。
内容为:
MMLOGTS \d{4}-\d{2}-\d{2}\s\d{2}\:\d{2}\:\d{2}.\d{3}
MMLOGWORD \w
MMLOGWORDEXT [^]]+
MMLOGTHREAD \w
(\w)\:\w-\w
MMLOGCLASS [\w.]+\:
\w\s

下面就可以启动docker了,对于ELK中的Elasticsearch 和 Logstash都需要是集群三个点的,而Kibana只是展示数据,单点即可。所以启动脚本分别为:

打开下面网址校验安装成功与否:
Kibana Web : http://<your-host>:5601
Elasticsearch Json : http://<your-host>:9200/
ES的插件安装请自己进入docker然后下载, 比较好用的有 head, hq 等

四、Filebeat5 的安装

这个最简单了,官网上下载 filebeat-5.0.0-rc1-linux-x86_64.tar.gz 解压安装;

vi /usr/local/src/filebeat5/filebeat.yml
编辑内容如下:
################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
# Each - is a prospector. Below are the prospector specific configurations
- input_type: log
 paths: ["/usr/local/src/logs/${appname}-${cellname}-${nodename}/sys-.log"]
 encoding: utf-8
 exclude_files: ['.
\d{4}-\d{2}-\d{2}..log']
 document_type: syslog
 fields:
   mlogapp: ${appname}
 fields_under_root: true
 scan_frequency: 1s
 ignore_older: 30m
# must set ignore_older to be greater than close_inactive.
 close_inactive: 5m
 close_removed: true
 clean_removed: true
 multiline:
   pattern: ^[[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}[[:blank:]][[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2}.[[:digit:]]{3}]
   negate: true
   match: after
   max_lines: 500
output.kafka:
 # initial brokers for reading cluster metadata
 hosts: ["192.168.0.1:9092", "192.168.0.2:9092", "192.168.0.3:9092"]
 # message topic selection + partitioning
 topic: '%{[type]}'
 partition.round_robin:
   reachable_only: false
 required_acks: 1
 compression: gzip
 max_message_bytes: 1000000
############################# Logging #########################################
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
 path: /usr/local/logs/filebeat
 name: filebeat.log
 keepfiles: 7
.
.
.
----------------------
*
启动Filebeat5:**
export appname="uss-web" && export cellname="cell01" && export nodename="node01" \
&& cd /usr/local/src/filebeat/ \
&& nohup ./filebeat -e > /usr/local/src/logs/filebeat/nohup.out 2>&1 &

关于Filebeat5+Kafka+ELK Docker是怎么搭建日志系统就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. 使用python怎么对kafka进行操作
  2. 使用pykafka怎么接收Kafka消息队列

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

kafka

上一篇:Scala的环境怎么搭建

下一篇:Kafka日志清理策略有哪些

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》