ELK日志分析集群
第一章 日志收集1.1.1 filebeat作用。
第一章 日志收集
1.1 filebeat
1.1.1 filebeat作用
- filebeat是一款部署在服务器上的日志收集工具, 用来实时收集指定服务的日志, 并对日志进行指定输出
注意: - filebeat的输出目标同一时间只能有一个
- filebeat只有第一次收集日志时, 才会从文件头开始读取
- filebeat收集的日志格式为json, 每一行日志都是一个事件
1.1.2 安装filebeat - 下载地址: https://mirrors.aliyun.com/elasticstack/7.x/yum/7.17.29/?spm=a2c6h.25603864.0.0.7975505007UAnD
- 安装filebeat
tar -zxvf ./filebeat-7.13.2-linux-x86_64.tar.gz -C /usr/local
mv /usr/local/filebeat-7.13.2 /usr/local/filebeat - 配置filebeat的systemctl启动
vim /usr/lib/systemd/system/filebeat.service
[Unit]
Description=Filebeat sends log files to Logstash or directly to Elasticsearch.
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Environment=“BEAT_CONFIG_OPTS=-c /usr/local/filebeat/filebeat.yml”
ExecStart=/usr/local/filebeat/filebeat $BEAT_CONFIG_OPTS
Restart=always
[Install]
WantedBy=multi-user.target
systemctl daemon-reload
注意:
- filebeat两种启动方式: 前台启动, 后台启动
- 前台启动: 可以在控制台直接查看日志内容
- 后台启动: nohup与systemctl
- nohup启动, 日志结果可以在nohup.out文件中查看
- systemctl启动, 无法查看输出日志
- filebeat的简单使用
配置filebeat读取指定文件中的日志
vim /usr/local/filebeat/filebeat.yml
inputs日志来源
filebeat.inputs:
- type: log
#开启filebeat的读取日志功能
enable: true
#日志来源
paths:- /var/log/httpd/access_log
#配置调用filebeat的模块来收集日志
filebeat.config.modules:
#调用filebeat的所有模块来收集日志
path: ${path.config}/modules.d/*.yml
#filebeat的配置文件更新时, 该程序不自动加载更新后的配置
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
- /var/log/httpd/access_log
将filebeat读取到的日志输出到控制台
output.console:
pretty: true
filebeat对收集的日志进行处理
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- 启动filebeat, 控制台观察日志的输出
cd /usr/local/filebeat
/usr/local/filebeat/filebeat
或
/usr/local/filebeat/filebeat -c /usr/local/filebeat/filebeat.yml
1.1.3 filebeat的日志收集模块
- 查看filebeat支持的收集模块
/usr/local/filebeat/filebeat modules list
#支持nginx, mysql, apache, redis等
注意:
enabled表示启用的模块
disabled表示未启用的模块 - 启用apache模块
/usr/local/filebeat/filebeat modules enable apache
#或
vim /usr/local/filebeat/filebeat/modules.d/apache.yml
access:将false改为true
enabled: truevar.path: [“/usr/httpd/access_log1”,“/usr/httpd/access_log2”]
error:
enabled: true
3. 执行测试
/usr/local/filebeat/filebeat
systemctl stop httpd
systemctl start httpd
http://192.168.255.132
注意:
- 若服务日志的存放路径不是默认路径, 在模块对应的配置文件中, 使用var.paths来指定即可
- var.paths指定的路径与默认配置不冲突, 若默认配置与var.paths中都存在日志, 那么filebeat都会收集
- filebeat常用output: console(输出到控制台), kafka(输出到kafka), logstash(输出到logstash), elasticsearsh(输出到elasticsearch)
- output只能指定一个输出方式
1.1.4 filebeat的重读日志 - 由于filebeat只会在第一次读日志时, 会从头开始读, 若需要再次重读日志, 删除/usr/local/filebeat/filebeat/data/*即可
1.1.5 filebeat对日志的过滤与增强 - 过滤含有core:notice的行
vim /usr/local/filebeat/filebeat.yml
processors:
#删除info事件
- drop_event:
when:
regexp:
message: “core:notice”
#运行
/usr/local/filebeat/filebeat
- 输出事件中添加指定内容
vim /usr/local/filebeat/filebeat.yml
processors:
#事件中添加json段落
- add_fields:
#输出时添加json段落test
target: test
#test的json中包含字段id, name
fields:
name: zs
id: 123456
#运行
/usr/local/filebeat/filebeat
- 删除事件中的字段
vim /usr/local/filebeat/filebeat.yml
processors:
- drop_fields
#删除事件中host的json段落, 删除test段落中的name字段
fields: [“host”,“test.name”]
#若删除的字不存在, 忽略他
ignore_missing: true
#运行
/usr/local/filebeat/filebeat
1.2 logstash
1.2.1 lostash作用
- logstash是一款部署在服务器上的日志收集工具, 用来实时收集指定服务的日志, 并对日志进行指定输出, 常用于对filebeat收集的日志进行过滤
注意: - logstash的三大组件: input, filter, output
1.2.2 安装logstash
#解压并重命名
tar -zxvf ./logstash-7.13.2.tar.gz -C /usr/local
mv /usr/local/logstash-7.13.2 /usr/local/logstash
1.2.3 简单测试logstash
#使用logstash的标准输入输出, 并启动logstash
-e ''等同于 -e input { stdin { type => stdin } } output { stdout { codec => rubydebug } }
表示logstash从键盘接收输入, 使用rubydebug格式输入
/usr/local/logstash/bin/logstash -e ‘’
启动后, 使用键入内容进行测试
1.2.4 配置logstash的管道输入输出
vim /usr/local/logstash/config/first-pipeline.conf
input {
#使用标准输入
stdin {}
}
output {
#使用标准输出
stdout {}
}
#运行测试, -f用于指定logstash启动时加载的管道文件
/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/first-pipeline.conf
#控制台中输入内容测试管道文件是否能被正常使用
注意:
- logstash支持多输入, 多输出以及多种过滤器
1.2.5 Grok过滤器
vim /usr/local/logstash/config/two-pipeline.conf
input {
#采用文件形式的输入
file {
#文件路径
path => [“/var/log/httpd/access_log”, /var/log/httpd/error_log]
#从文件第一行开始读
start_position => “beginning”
}
}
#使用logstash的filter对日志内容进行过滤
filter {
#使用grok插件, 对httpd的日志进行格式化
gork {
#match表示匹配字段, 对message字段, 使用combinedapachelog模式, 对apache的访问与错误日志进行格式化
match => { “message” => “%{COMBINEDAPACHELOG}” }
}
mutate {
#对结构化的字段重命名
rename => {
“timestamp” => “access_time”
}
}
mutate {
#输出时不显示message与ident字段
remove_field => [“message”,“ident”]
}
}
output {
#过滤后执行标准输出
stdout {}
}
#测试
/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/two-pipeline.conf
注意: - grok的combinedapachelog模式, 是对apache服务产生的日志进行格式化
1.2.6 filebeat输入logstash接收 - 配置logstash
vim /usr/local/logstash/config/filebeat.conf
input {
#beats插件用于接收其他程序的数据, 运行在5044端口
beats {
port => 5044
}
}
filter {
grok {
match => { “message” => “%{COMBINEDAPACHELOG}” }
}
}
output {
stdout {}
} - 配置filebeat的apache模块
vim /usr/local/filebeat/modules.d/apache.yml
filebeat开启对apache的访问与错误日志的收集
- module: apache
access:
enabled: true
error:
enabled: true
- 配置filebeat的输出
vim /usr/local/filebeat/filebeat.yml
#output模块中修改
output.logstash:
#filebeat收集到的内容, 输出到logstash主机的5044的beats端口
hosts: [“192.168.255.132:5044”] - 删除filebeat的缓存
#由于filebeat对度过的内容不会从头开始读, 删除缓存, 重新读apache的日志文件
rm -rf /usr/local/filebeat/data - 运行filebeat与logstash
/usr/local/filebeat/filebeat
/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/filebeat.conf
第二章 日志存储
2.1 ElasticSearch
2.1.1 elasticsearch作用 - 用于存储logstash与filebeat收集的日志, 并对存储的日志进行搜索与分析
注意: - elasticsearch支持的数据类型: 结构化数据, 非结构化数据, 数据字典, 地理空间数据
- elasticsearch的最小数据单元是文档
- 文档是指json化后的一条数据
- elasticsearch会对每一个文档添加索引, 方便检索
2.1.2 es集群 - es集群是将数据进行分布式存储, 具有高可用, 易扩展的特点
2.1.2.1 es集群节点 - 主节点: 负责管理集群的状态与分片的分配, 不会存储与查询数据
- 数据节点: 负责数据主分片与副本分片的存储, 写入, 查询等工作
2.1.2.2 主分片与副本分片 - 主分片: 将数据均匀的拆分, 存储到不同数据节点中
- 副本分片: 主分片的备份
注意: - 主分片与其副本分片不能存储在同一数据节点, 避免数据丢失
2.1.3 es集群部署 - 环境
filebeat: 192.168.255.130
kibana: 192.168.255.131
logstash: 192.168.255.132
es:
192.168.255.133
192.168.255.134
192.168.255.135
kafka:
192.168.255.141
192.168.255.142
192.168.255.144 - 配置服务器
#关闭防火墙以及selinux
systemctl disable firewalld
sed -ri ‘s/SELINUX=enforcing/SELINUX=disabled/g’ /etc/sysconfig/selinux
setenforce 0
#配置域名解析
cat >> /etc/hosts << EOF
192.168.255.130 filebeat
192.168.255.132 logstash
192.168.255.133 es1
192.168.255.134 es2
192.168.255.135 es3
EOF - 安装并配置filebeat
tar -zxvf ./filebeat-7.13.2-linux-x86_64.tar.gz -C /usr/local
mv /usr/local/filebeat-7.13.2-linux-x86_64 /usr/local/filebeat
#开启apache模块
mv /usr/local/filebeat/modules.d/apache.yml.disabled /usr/local/filebeat/modules.d/apache.yml
vim /usr/local/filebeat/modules.d/apache.yml
- module: apache
access:
enabled: true
error:
enabled: true
#配置filebeat的输出
vim /usr/local/filebeat/filebeat.yml
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
output.logstash:
hosts: [192.168.255.132:5044]
- 安装并配置logstash
#安装logstash
tar -zxvf logstash-7.13.2-linux-x86_64.tar.gz -C /usr/local
mv /usr/local/logstash-7.13.2 /usr/local/logstash
#配置logstash的输入输出, 以及过滤器
vim /usr/local/logstash/config/pipe.conf
#输入监听本机5044端口
input {
beats {
port => 5044
}
}
#过滤器
filter {
#采用logstash的格式化输出
grok{
match => { “message” => “%{COMBINEDAPACHELOG}” }
}
#过滤掉message字段
mutate {
remove_field => [“message”]
}
}
#输出到es集群
output {
elasticsearch {
hosts => [“es1:9200”, “es2:9200”, “es3:9200”]
}
} - 安装elasticsearch
yum -y install elasticsearch-7.13.2-x86_64.rpm
systemctl enable elasticsearch - 配置elasticsearch
#133配置节点1
vim /etc/elasticsearch/elasticsearch.yml
#集群名称
cluster.name: es
#节点名称
node.name: es1
#是否是数据节点
node.data: true
#本机任意ip的9200端口, 都可接收logstash的信息
network.host: 0.0.0.0
#es的工作端口
http.port: 9200
#配置集群节点
discovery.seed_hosts:
- es1
- es2
- es3
#集群中主节点的选举范围
cluster.initial_master_nodes: [“es1”,“es2”,“es3”]
#134配置节点2
vim /etc/elasticsearch/elasticsearch.yml
#集群名称
cluster.name: es
#节点名称
node.name: es2
#是否是数据节点
node.data: true
#本机任意ip的9200端口, 都可接收logstash的信息
network.host: 0.0.0.0
#es的工作端口
http.port: 9200
#配置集群节点
discovery.seed_hosts:
- es1
- es2
- es3
#集群中主节点的选举范围
cluster.initial_master_nodes: [“es1”,“es2”,“es3”]
#135配置节3
vim /etc/elasticsearch/elasticsearch.yml
#集群名称
cluster.name: es
#节点名称
node.name: es3
#是否是数据节点
node.data: true
#本机任意ip的9200端口, 都可接收logstash的信息
network.host: 0.0.0.0
#es的工作端口
http.port: 9200
#配置集群节点
discovery.seed_hosts:
- es1
- es2
- es3
#集群中主节点的选举范围
cluster.initial_master_nodes: [“es1”,“es2”,“es3”]
#启动服务
systemctl start elasticsearch
#互相检查集群主机健康状态,绿色表示正常;黄色表示单节点部署,不具有备份能力;红色表示数据不可用
curl -X GET “localhost:9200/_cat/health?v”
检查集群节点的运行状态
curl -X GET “localhost:9200/_cat/nodes?v”
注意:
- es的9200端口对外提供被访问功能, 用于接收数据
- es的9300端口是集群中主机与主机之间的通信, 用于选举主节点
- 主节点用*表示, 数据节点用-表示
- 开启logstash与filebeat
/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/pipe.conf
/usr/local/filebeat/filebeat -c /usr/local/filebeat/filebeat.yml - 验证es集群是否创建索引
curl -X GET “localhost:9200/_cat/indices” - logstash配置指定索引
vim /usr/local/logstash/config/pipe.conf
output {
#httpd的访问日志
if [event][dataset] == “apache.access” {
elasticsearch {
hosts => [“es1:9200”, “es2:9200”, “es3:9200”]
#向es集群传输时创建http的访问日志索引
index => “%{[host][hostname]}-httpd-access-%{+YYYY.MM.dd}”
}
}
#http的错误日志
else if [event][dataset] == “apache.error” {
elasticsearch {
hosts => [“es1:9200”, “es2:9200”, “es3:9200”]
#向es集群传输时创建http的访问日志索引
index => “%{[host][hostname]}-httpd-error-%{+YYYY.MM.dd}”
}
}
} - 启动logstash
/usr/local/logstash/bin/logstash -f /usr/local/logstash/config/pipe.conf - es集群查看索引
http://192.168.255.130
systemctl stop httpd
curl -X GET “localhost:9200/_cat/indices”
第三章 日志管理
3.1 kibana
3.1.1 作用 - 用于对es集群中存储的日志进行可视化, 同时对日志进行分析和管理
3.2 安装kibana
tar -zxvf ./kibana-7.13.2-linux-x86_64.tar.gz -C /usr/local
mv /usr/local/kibana-7.13.2 /usr/local/kibana
3.3 将kibana部署在es集群上 - 配置域名解析与kibana
#延续之间es集群, 部署kibana
cat >> /etc/hosts << EOF
192.168.255.130 filebeat
192.168.255.131 kibana
192.168.255.132 logstash
192.168.255.133 es1
192.168.255.134 es2
192.168.255.135 es3
EOF
#配置kibana
vim /usr/local/kibana/config/kibana.yml
server.port: 5601
server.host: “0.0.0.0”
#连接es集群的地址与端口
elasticsearch.hosts: [http://es1:9200",“http://es2:9200”,“http://es3:9200”]
#配置kibana的日志存放路径
logging.dest: /var/log/kibana/kibana.log
#设置操作界面为中文
il8n.locale: “zh-CN”
2. 创建kibana的管理用户
useradd kibana
passwd kibana
mkdir /run/kibana /var/log/kibana
chown -R kibana.kibana /run/kibana /var/log/kibana /usr/local/kibana
3. 启动kibana
su - kibana
#将kibana运行在后台
nohup /usr/local/kibana/bin/kibana &
4. 测试访问
http://kinaba:5601
5. 创建索引
[图片]
[图片]
[图片]
[图片]
[图片]
[图片]
6. 查看数据
[图片]
[图片]
第四章 日志缓冲
4.1 kafka
4.1.1 作用
- 用于数据缓冲队列, 临时存放数据, 避免高并发导致集群宕机, 具有削峰填谷的能力
注意:
kafka的运行依赖zookeeper
4.1.2 特点 - 高吞吐量: 每秒可以处理十万级的消息
- 可扩展性: 支持热扩展
- 可靠性: kafka的数据会被持久化到磁盘, 并且支持数据的备份
- 容错性: 允许集群中的节点失败
- 高并发: 支持数千用户的同时读写
4.1.3 组件 - 话题(topic): 逻辑概念, kafka将消息进行分类, 每一个类型就是一个topic
- 生产者(producer): 消息的发送者, 将消息发送给kafka
- 消费者(consumer): 消息的接收者, 读取kafka中的消息
- 服务代理(broker): 处理生产者与消费者的请求, 一个集群有多个broker
- partition(区): topic的物理分片, 将一个topic进行多个分片存储,
- replication: 区的副本, 防止消息丢失, 每个分片可以有多个副本
- leader: 处理所有生产者的写入以及消费者的读取
- follower: 同步leader中的数据, 不处理请求, leader宕后, follower会自动作为leader
- zookeeper: 对数据进行存储
4.1.4 安装zookeeper
#安装jdk, 支持zookeeper的运行
yum -y install java-1.8.0-openjdk
#配置域名解析
cat >> /etc/hosts << EOF
192.168.255.141 kafka1
192.168.255.142 kafka2
192.168.255.144 kafka2
EOF
#安装kafka
tar -zxvf kafka_2.12-2.8.0.tgz -C /usr/local
mv /usr/local/kafka_2.12-2.8.0.tgz /usr/local/kafka
4.1.5 配置kafka - 配置zookeeper
vim /usr/local/kafka/config/zookeeper.properties
#指定数据的持久化路径
dataDir=/opt/data/zookeeper/data
#指定日志存放路径
dataLogDir=/opt/data/zookeeper/logs
#客户端与zookeepr的连接端口
clientPort=2181
#zk服务器之间或客户端与zk服务器之间的心跳间隔2秒
tickTime=2000
#follower与leader同步数据的初始化连接时间
initLimit=20
#follower无法在指定时间内在leader上获得数据, 那么follower将会作为leader
syncLimit=10
#配置kafka集群, 2888是主从数据交换的端口, 3888从选举主的端口
server.1=192.168.255.141:2888:3888
server.2=192.168.255.142:2888:3888
server.3=192.168.255.143:2888:3888
#创建日志与数据存放目录
mkdir -p /opt/data/zookeeper/{logs,data}
#192.168.255.141:1, 192.168.255.142:2, 192.168.255.143:3
echo 1 > /opt/data/zookeeper/data/myid
2. 配置kafka
vim /usr/local/kafka/config/server.properties
#集群中broker的id, 不能相同
broker.id=1
listeners=PLAINTEXT://192.168.255.141:9092 #改
#broker处理消息的线程数
num.network.threads=3
#broker对数据持久化的IO线程数
num.io.threads=8
#消息发送缓冲区
socket.send.buffer.bytes=102400
#消息接收缓冲区
socket.receive.buffer.bytes=102400
#请求的最大值
socket.request.max.bytes=104857600
#日志路径
log.dirs=/opt/data/kafka/logs
#分片数量
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.255.141:2181,192.168.255.142:2181,192.168.255.143:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
3. 启动zookeeper
nohup bin/zookeeper‐server‐start.sh config/zookeeper.properties &
4. 启动kafka
nohup bin/kafka‐server‐start.sh config/server.properties &
5. 141创建topic
bin/kafka‐topics.sh ‐‐create ‐‐zookeeper localhost:2181 ‐‐replication‐factor 1 ‐‐partitions 1 ‐‐topic testtopic
6. 142查询topic
bin/kafka‐topics.sh ‐‐zookeeper 192.168.255.142:2181 ‐‐list
7. 配置logstash与kafka
vim /usr/local/logstash/config/pipe.conf
input {
kafka {
type => “apache_log”
codec => “json”
topics => [“apache”]
decorate_events => true
bootstrap_servers => “192.168.255.141:9092, 192.168.255.142:9092, 192.168.255.143:9092”
}
}
8. 配置filebeat与kafka
vim /usr/local/filebeat/filebeat.yml
output.kafka:
hosts: [“kafka11:9092”, “kafka2:9092”, “kafka3:9092”]
topic: ‘apache’
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
9. 启动使用kibana验证
更多推荐
所有评论(0)