Elastic Stack–ELFK架构

随笔1周前发布 易卯
18 0 0

 前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除

 学习B站博主教程笔记: 

最新版适合自学的ElasticStack全套视频(Elk零基础入门到精通教程)Linux运维必备—ElasticSearch+Logstash+Kibana精讲_哔哩哔哩_bilibiliElastic Stack--ELFK架构https://www.bilibili.com/video/BV1VMW3e6Ezk/?spm_id_from=333.1007.tianma.1-1-1.click&vd_source=e539f90574cdb0bc2bc30a8b5cb3fc00

Elastic Stack--ELFK架构

一、部署logstash环境




 # 官方下载搜索地址


 https://www.elastic.co/cn/downloads/past-releases#elasticsearch


 ​


 # 使用wget在master节点进行下载安装


 wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.3-x86_64.rpm


 yum -y localinstall logstash-7.17.3-x86_64.rpm

创建软链接:




 [root@k8s-master ~]# systemctl cat logstash


 ...


 ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"


 ...


 ​


 # 创建一个软链接方便操作


 ln -sv /usr/share/logstash/bin/logstash /usr/local/bin


 logstash -h     # 检查


 ​


 # 创建一个用于存放配置文件的目录


 [root@k8s-master ~]# mkdir config-logstash

二、input插件

1、收集标准输入




 #(1)编写配置文件


 # vim config-logstash/01--sdtin-to-stdout.yml


 input { stdin {} }


 output { stdout {} }


 ​


 #(2)检查配置文件语法


 logstash -tf config-logstash/01-stdin-to-stdout.conf


 ​


 #(3)启动logstash实例


 logstash -f config-logstash/01-stdin-to-stdout.conf

2、文件输入插件




 input {


   file {


      #指定文件路径


      path => ["/tmp/test/*.txt"]


      #指定文件的读取位置,仅在".sincedb*"文件中没有记录的情况下生效


      #默认从末尾开始读取


      start_position => "beginning"      # /"end"


   }


 }


 ​


 output { stdout {} }

选择 Logstash 开始初始读取文件的位置:在开始时或在最后。默认行为将文件视为实时流,因此从最后开始

3、tcp输入插件,实现日志聚合

Elastic Stack--ELFK架构




 input { tcp { port => 8888 }}


 output { stdout {} }

4、基于http案例




 input {


   http { port => 8888 }


   http { port => 9999 }


 }


 ​


 output { stdout {} }

5、input插件基于redis案例




 input {


   redis {


     data_type => "list"         # 指定的redis的键的类型


     db => 5                     # 指定数据库的编号,默认值是0号数据库


     host => "192.168.1.10"      # 指定数据库的IP地址,默认值是localhost


     port => 6379                # 指定数据库的端口号,默认值为6379


     password => "cluster"       # 指定redis的认证密码


     key => "xxxx"               # 指定从redis的哪个key取数据


   }


   http { port => 8888 }}


 ​


 output { stdout {} }

6、input插件基于beats案例

Elastic Stack--ELFK架构




 # filebeat配置:


 filebeat.inputs:


 - type: tcp


   host: "0.0.0.0:9000"


 ​


 output.logstash:


   hosts: ["192.168.1.10:5044"]


 ​


 # logstash配置:


 input { beats { port => 5044 } }


 output { stdout {} }

三、output插件

1、logstash发送数据到redis环境

(1)编写配置文件




 input { tcp { port => 9999 } }


 ​


 ouput {


   stdout {}


   redis {


     host => "192.168.1.10"      # 指定redis的主机地址


     port => "6379"              # 指定redis的端口号


     db => 10                    # 指定redis的数据库编号


     password => "cluster"       # 指定redis的密码


     data_type => "list"         # 指定写入数据的key类型


     key => "cluster-linux-logstash" # 指定写入的key名称


   }


 }

(2)启动测试

 logstash -f config/tcp-to-redis.conf



 # 发送测试数据


 echo 111111 | nc 192.168.1.10 9999


 # 在redis中查看数据


 192.168.1.10:6379[10]>KEYS *


 1) "cluster-linux-logstash"


 192.168.1.10:6379[10]>type cluster-linux-logstash


 list

2、logstash输出插件file案例

Elastic Stack--ELFK架构

(1)编写配置文件




 input { tcp { port => 9999 } }


 ​


 output {


   stdout{}


   file { 


     path => "/tmp/cluster-linux-logstash.log"       # 指定磁盘的落地位置


   }


 }

(2)测试运行




 logstash -f /config-logstash/tcp-to-file.conf


 ​


 # 同样的对9999端口发送测试数据


 echo 老男孩教育 | nc 192.168.1.10 9999


 cat /tmp/cluster-linux-logstash.log


 ...

3、logstash的输出插件到ES案例

(1)配置文件编写




 input { tcp { port => 9999 } }


 ​


 output {


   stdout{}


   elasticsearch {


     hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


     index => "cluster-linux-logstash-%{+yyyy.MM.dd}"


   }


 }

(2)启动测试

 logstash -rf config-logstash/tcp-to-es.conf

四、logstash综合案例

1、根据架构图收集日志

Elastic Stack--ELFK架构




 # many-to-es.conf


 input {


   tcp {


     type => "tcp"


     port => 6666


   }


   beats {


     type => "beat"


     port => 7777


   }


   redis {


     type => "redis"


     data_type => "list" 


     db => 5


     host => "192.168.1.10"  


     port => "6379"                      


     password => "cluster"   


     key => "cluster-linux-filebeat"


   }  


 }


 ​


 output {


   # stdout{}    # 生产环境注释,输出到屏幕(调试使用)


   if [type] == "tcp" {


     elasticsearch {


       hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


       index => "cluster-linux-tcp-%{+yyyy.MM.dd}"


     }


   }else if [type] == "beat" {


     elasticsearch {


       hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


       index => "cluster-linux-beat-%{+yyyy.MM.dd}"


     }


   }else if [type] == "redis" {


     elasticsearch {


       hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


       index => "cluster-linux-redis-%{+yyyy.MM.dd}"


     }


   }else {


     elasticsearch {


       hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


       index => "cluster-linux-other-%{+yyyy.MM.dd}"


     }


   }


 }


 ​


 # 收集9999端口服务的日志信息:tcp-to-logstash.yml


 filebeat.inputs:


 - type: tcp


   host: "0.0.0.0:9999"


 output.logstash:


   host: ["192.168.1.10:7777"]


   


 # 收集redis服务端口日志信息:tcp-to-redis.yml


 filebeat.inputs:


 - type: tcp


   host: "0.0.0.0:8888"


 output.redis:


   hosts: ["192.168.1.10:6379"]


   password: "cluster"


   db: 5


   key: "cluster-linux-filebeat"


   timeout: 3

启动测试:




 filebeat -e -c tcp-to-logstash.yml


 filebeat -e -c tcp-to-redis.yml --path.data /tmp/filebeat/


 logstash -rf many-to-es.conf

2、收集Nginx与Tomcat日志

Elastic Stack--ELFK架构

(1)收集Nginx日志到redis步骤:




 #1、nginx-to-redis.yml


 filebeat.inputs:


 - type: log


   paths: 


     - /var/log/nginx/access.log*


   json.keys_under_root: true


   


 output.redis:


   hosts: ["192.168.1.10:6379"]


   password: "cluster"


   db: 8


   key: "cluster-linux-filebeat"


   timeout: 3


   


 #2、运行测试


 [root@k8s-node1 config]# rm -rf /var/lib/filebeat/*


 [root@k8s-node1 config]# filebeat -e -c /etc/filebeat/config/32-nginx-to-redis.yml 


 [root@k8s-master ~]# redis-cli -a cluster -n 8


 127.0.0.1:6379[8]> keys *


 1) "cluster-linux-filebeat"


 127.0.0.1:6379[8]> lrange cluster-linux-filebeat 0 -1


 1) "{"@timestamp":"202...    # 发现数据已经送达redis

(2)logstash收集日志到ES




 # many-to-es.conf


 input {


   beats {


     port => 8888


   }


   redis {


     data_type => "list"


     db => 8


     host => "192.168.1.10"


     port => 6379


     password => "cluster"


     key => "cluster-linux-filebeat"


   }


 }


 ​


 output {


   stdout{}


   elasticsearch {


     hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


     index => "cluster-linux-logstash-%{+yyyy.MM.dd}"


   }


 }


 ​


 #运行测试


 [root@k8s-master ~]# logstash -f config-logstash/11-many-to-es.conf

Elastic Stack--ELFK架构

(3)收集Tomcat日志到logstash:




 #1、tomcat-to-logstash.yml


 filebeat.inputs:


 - type: log


   paths:


     - /root/software/apache-tomcat-10.1.28/logs/*.txt


   json.keys_under_root: true


 ​


 output.logstash:


   hosts: ["192.168.1.10:8888"]


   


 #2、运行测试


 [root@k8s-node1 config]# filebeat -e -c /etc/filebeat/config/33-tomcat-to-logstash.yml --path.data /tmp/filebeat/

五、filter插件

1、gork

1.1、基本的gork使用

Grok将非结构化日志数据解析为结构化和可查询的好方法;非常适合syslog日志、apache和其他网络服务器日志、Mysql日志,以及通常为人类而非计算机消耗而编写的任何日志格式。内置120种匹配模式,当然也可自定义匹配模式;

(1)还原nginx日志格式,使用原生日志

 vim /etc/ngin/nginx.conf

(2)编写filebeat收集Nginx原生日志文件输出到logstash的8888端口文件:




 # nginx-to-logstash.yml文件内容如下:


 filebeat.inputs:


 - type: log


   paths:


     - /var/log/nginx/access.log


 ​


 output.logstash:


   hosts: ["192.168.1.10:8888"]

(3)编写logstash收集8888端口收到的文件并发送的ES集群中文件:

使用grok过滤器对原生日志进行拆分;




 # beat-to-es.conf文件内容如下:


 input {


   beats {


     port => 8888


   }


 }


 filter {


   grok {


     match => {


       # "message" => "%{COMBINEDAPACHELOG}"   # 官方GitHub已经废弃,建议使用下面的匹配模式


       "message" => "%{HTTPD_COMMONLOG}"


     }


   }


 }


 output {


   stdout {}


   elasticsearch {


     hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


     index => "cluster-linux-logstash-%{-yyyy.MM.dd}"


   }


 }

(4)运行测试




 filebeat -e -c /etc/config/nginx-to-logstash.yml


 logstash -rf config-logstash/beat-to-es.conf

Elastic Stack--ELFK架构

1.2、gork自定义的正则案例(知识储备)

有时候logstash没有我们需要的模式;我们自定义方法为:

首先使用 Oniguruma syntax 进行命名捕获,可让我们匹配一段文本并将其保存为字段:




 (?<field_name>这里的模式)


 ​


 # 例如,后缀日志有queue id一个10或11个字符的十六进制值,我们可以


 (?<queue_id>[0 - 9A - F]{10,11})

或者创建自定义模式文件:

创建一个目录,包含一个名为patterns的文件extra

在该文件中,将需要的模式写为模式名称、一个空格,然后是该模式的正则表达式。例如上述可以写成:




 # ./patterns/posifix的内容:


 POSITION_QUEUEID [0 - 9A - F]{10,11}

然后使用patterns_dir这个插件中的设置告诉logstash你的自定义模式目录在哪。完整示例:




 Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>


 ​


 filter {


   grok {


     patterns_dir => ["./patterns"]


     match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }


   }


 }

这会匹配并生成出一下字段:




 timestamp: Jan  1 06:25:43


 logsource: mailserver14


 program: postfix/cleanup


 pid: 21403


 queue_id: BEF25A72965


 syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

1.3、remove_field与add_field子弹

如果此筛选器成功,将此事件中删除任意字段。 字段名称可以是动态的,并使用 %{field} 包含事件的一部分 例:




 filter {


   grok {


     # 移除指定的字段


     remove_field => [ "需要移除的字段1", "需要移除的字段2" ]


     # 添加指定的字段


     add_filed => { 


        "oldboyedu-clientip" => "clientip ---> %{clientip}"


        "school" => "北京大学"   


     }


   }


 }

1.4、add_tag与remove_tag字段

添加与移除tag标签:




 filter {


   grok {


     # 添加tag


     add_tag => [ "linux","zookeeper","kafka","elk" ]


     # 移除tag


     remove_tag => [ "zookeeper","kafka" ]


   }


 }

1.5、id字段

ID为插件配置添加一个唯一的。若有两个或多个相同类型的插件时建议使用。




 filter {


   grok {


     ...


     id => "nginx"


   }


 }

2、date插件修改写入ES的时间




 filter {


   grok {...}


   date {


     # 两种写法均可


     match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]


     match => ["timestamp","dd/MMM/yyyy:HH:mm:ss +0800"]


     # 可选项;


     timezone => "Asia/Shanghai"


     # 将匹配到的时间字段解析后存储到目标字段,若不指定,则默认字段为"@timestamp"字段


     target => "cluster-linux-access-time"


   }


 }

3、geoip分析源地址的地理位置

geoip字段对IP地址进行分析




 filter {


   grok {..}


   date {..}


   geoip {


       # 指定基于哪个IP进行分析


       source => "clientip"


       # 可设置只展示指定字段,若不设置,表示显示所有的查询字段


       fields => ["city_name","country_name","ip"]


       # 指定geoip的输出字段,对多个IP地址进行分析


       target => "cluster-linux-geoip"


   }


 }

4、useragent分析客户端的设备类型




 filter {


   date {..}


   geoip {..}


   useragent {


      source => "http_user_agent"            # 指定客户端的设备相关信息的字段


      target => "cluster-linux-useragent"    # 将分析的数据存储在一个指定的字段中,若不指定,则默认存储在target字段中


   }


 }

5、mutate组件数据准备

(1)python脚本准备数据




 cat > generate_log.py <<EOF


 #!/usr/bin/env/ python


 import datetime


 import random


 import logging


 import time


 import sys


 ​


 LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "


 DATE_FORMAT = "%Y-%m-%d %H:%M:%S"


 ​


 # 配置root的logging.Logger实例的基本配置


 logging.basicConfig(level=logging.INFO,format=LOG_FORMAT,detefmt=DATE_FORMAT,filename=sys.argv[1],filemode='a',)


 actions = ["浏览页面","评论商品","加入收藏","加入购物车","提交订单","使用优惠券","领取优惠券","搜索","查看订单","付款","清空购物车"]


 ​


 while True:


     time.sleep(random.randint(1,5))


     user_id = random.randint(1,10000)


     price = round(random.uniform(15000,30000),2)


     action = random.choice(actions)


     svip = random.choice([0,1])


     logging.info("DAU|{0}|{1}|{2}".format(user_id,action,svip,price))


 EOF


 # 运行、生成数据


 nohup python generate_log.py /tmp/app.log &>/dev/null &

(2)编写filebeat文件收集日志,并发送到logstash服务器的8888端口,以便接收:




 filebeat.inputs:


 - type: log


   enabled: true


   paths:


     - /tmp/app.log*


 output.logstash:


   hosts: ["192.168.1.10:8888"]

(3)在logstash服务器编写文件收集日志并发送到ES




 input {


   beats {


     port => 8888


   }


 }


 filter {


   mutate {


     add_field => { "school" => "B站大学老男孩IT教育" }


     remove_field => [ "@timestamp","agent","host","@version","ecs","tags","input","log"] }


   # 对"message"字段内容使用"|"进行切分"


   mutate { split => { "message" => "|" }}


   mutate {


     # 添加字段,其中引用到了变量


     add_field => {


        "user_id" => "%{[message][1]}"


        "action" => "%{[message][2]}"


        "svip" => "%{[message][3]}"


        "price" => "%{[message][4]}"


     }


   }


   mutate {


     # 将指定字段转换成相应的数据类型


     convert => {


        "user_id" => "integer"


        "svip" => "boolean"


        "price" => "float"


     }


   }


   mutate { strip => ["svip"] }      # 去除空格


   mutate { copy => { "price" => "cluster-linux-price" } }   #拷贝字段


   mutate { rename => { "message" => "cluster-ssvip" }}      #重命名


   mutate { replace => { "message" => "%{message}:My new message"}}  #替换


   mutate { uppercase => [ "message" ]}  # 首字母大写


 }


 ​


 output {


   stdout {}


   elasticsearch {


     hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


   }


 }

6、多if分支判断

(1)架构图如下:

Elastic Stack--ELFK架构

(2)通过filebeat收集nginx的access.log日志:




 filebeat.inputs:


 - type: log


   paths:


     - /var/log/nginx/access.log*


   json.keys_under_root: true


 output.logstash:


   hosts: ["192.168.1.10:8888"]

(2)logstash收集端口日志文件如下:




 input {


   beats {


     type => "cluster-beats"


     port => 8888


   }


   tcp {


     type => "cluster-tcp"


     port => 9999


   }


   tcp {


     type => "cluster-tcp-new"


     port => 7777


   }


   tcp {


     type => "cluster-http"


     port => 6666


   }


   tcp {


     type => "cluster-file"


     path => "/tmp/apps.log"


   }


 }


 filter {


   mutate {


     add_field => {


       "school" => "B站大学老男孩IT"


     }


   }


   if [type] == ["cluster-beats","cluster-tcp-new","cluster-http"] {


     mutate {


       remove_field => ["agent","host","@version","ecs","tags","input","log"]


     }


     geoip {


       source => "clientip"


       target => "cluster-linux-geoip"


     }


     useragent {


       source => "http_user_agent"


       target => "cluster-linux-useragent"


     } else if [type] == "cluster-file" {


       mutate {


         add_field => {


           "class" => "cluster-linux80"


           "address" => "xxx"


           "hobby" => ["LOL","王者荣耀"]


         }


         remove_field => ["host","@version","school"]


       }


     }  


   } else {


     mutate {


       remove_field => ["port","@version","host"]


     }


     mutate { 


       split => { "message" => "|" }}


       add_field => {


         "user_id" => "%{[message][1]}"


         "action" => "%{[message][2]}"


         "svip" => "%{[message][3]}"


         "price" => "%{[message][4]}"


       }


       remove_field => ["message"]


       strip => ["svip"]


     }


     mutate {


       # 将指定字段转换成相应的数据类型


       convert => {


          "user_id" => "integer"


          "svip" => "boolean"


          "price" => "float"


       }


     }


   }


 } 


 output {


   stdout {}


   if [type] == "cluster-beats" {


     elasticsearch {


       hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


       index => "cluster-linux-logstash-beats-%{+YYYY.MM.dd}"


     }  


   } else {


     elasticsearch {


       hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]


       index => "cluster-linux-logstash-tcp-%{+YYYY.MM.dd}"      


     }


   }


 ​


 }

(3)通过filebeat收集app.log的日志:




filebeat.inputs:


- type: log


  enabled: true


  paths:


    - /tmp/app.log*


output.logstash:


  hosts: ["192.168.1.10:9999"]

六、Kibana的样例数添加

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...