邢台市路桥建设总公司网站,公司网络营销的方案思路,python 电商网站开发,跳转链接生成器Logstash 过滤 Filter 插件
数据从源传输到存储库的过程中#xff0c;Logstash 过滤器能够解析各个事件#xff0c;识别已命名的字段以构建结构#xff0c; 并将它们转换成通用格式#xff0c;以便进行更强大的分析和实现商业价值。
Logstash 能够动态地转换和解析数据Logstash 过滤器能够解析各个事件识别已命名的字段以构建结构 并将它们转换成通用格式以便进行更强大的分析和实现商业价值。
Logstash 能够动态地转换和解析数据不受格式或复杂度的影响
常见的 Filter 插件:
利用 Grok 从非结构化数据中转化为结构数据利用 GEOIP 根据 IP 地址找出对应的地理位置坐标利用 useragent 从请求中分析操作系统、设备类型简化整体处理不受数据源、格式或架构的影响
官方链接
https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
https://www.elastic.co/guide/en/logstash/7.6/filter-plugins.htmlGrok 插件
Grok 介绍
Grok 是一个过滤器插件可帮助您描述日志格式的结构。有超过200种 grok模式抽象概念如IPv6地 址UNIX路径和月份名称。
为了将日志行与格式匹配, 生产环境常需要将非结构化的数据解析成 json 结构化数据格式
比如下面行:
2016-09-19T18:19:00 [8.8.8.8:prd] DEBUG this is an example log message使用 Grok 插件可以基于正则表达式技术利用其内置的正则表达式的别名来表示和匹配上面的日志,如下 效果
%{TIMESTAMP_ISO8601:timestamp} \[%{IPV4:ip};%{WORD:environment}\] %{LOGLEVEL:log_level} %{GREEDYDATA:message}最终转换为以下格式
{timestamp: 2016-09-19T18:19:00,ip: 8.8.8.8,environment: prd,log_level: DEBUG,message: this is an example log message
} 参考网站
https://www.elastic.co/cn/blog/do-you-grok-grok
http://grokdebug.herokuapp.com/
http://grokdebug.herokuapp.com/discover?#范例: Nginx 访问日志
#cat /var/log/nginx/access.log
10.0.0.100 - - [03/Aug/2022:16:34:17 0800] GET / HTTP/1.1 200 612 - curl/7.68.0%{COMBINEDAPACHELOG}范例: 利用kibana网站将nginx日志自动生成grok的内置格式代码
58.250.250.21 - - [14/Jul/2020:15:07:27 0800] GET /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3 HTTP/1.1 200 330 http://www.wangxiaochun.com/?p117 Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36 -基于上面生成的代码转化为 Json 格式
%{COMBINEDAPACHELOG}范例使用 grok pattern 将 Nginx 日志格式化为 json 格式
[rootlogstash ~]#vim /etc/logstash/conf.d/http_grok_stdout.conf
input {http {port 6666}
}
filter {#将nginx日志格式化为json格式grok {match {message %{COMBINEDAPACHELOG} #将message字段转化为指定的Json格式}}
}
output {stdout {codec rubydebug}
}
[rootlogstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_stdout.conf -r[rootlogstash ~]#curl -XPOST -d58.250.250.21 - - [14/Jul/2020:15:07:27 0800] GET /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3 HTTP/1.1 200 330 http://www.wangxiaochun.com/?p117 Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36 - 10.0.0.180:6666范例 直接将nginx的访问日志转化为Json格式
[rootubuntu2004 ~]#cat /etc/logstash/conf.d/nginx_grok_stdout.conf
input {file {path /var/log/nginx/access.logtype nginx-accesslogstart_position beginningstat_interval 3}
}
filter {
#将nginx日志格式化为json格式grok {match {message %{COMBINEDAPACHELOG} #将message字段转化为指定的Json格式}}
}
output {stdout {codec rubydebug}
}Geoip 插件
geoip 根据 ip 地址提供的对应地域信息比如:经纬度,国家,城市名等,以方便进行地理数据分析
filebeat配置范例:
[rootkibana ~]#cat /etc/filebeat/logstash-filebeat.yml
filebeat.inputs:
- type: logenabled: true #开启日志 paths:- /var/log/nginx/access.log #指定收集的日志文件 #json.keys_under_root: true #默认false,只识别为普通文本,会将全部日志数据存储至message字段改为true则会以Json格式存储#json.overwrite_keys: true #设为true,使用json格式日志中自定义的key替代默认的message字段,此项可选tags: [nginx-access]
output.logstash:hosts: [10.0.0.180:5044] #指定Logstash服务器的地址和端口 [rootkibana ~]#cat /var/log/nginx/access.log
58.250.250.21 - - [14/Jul/2020:15:07:27 0800] GET /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3 HTTP/1.1 200 330 http://www.wangxiaochun.com/?p117 Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36 -
logstash配置范例:
[rootlogstash ~]#vim /etc/logstash/conf.d/beats_geoip_stdout.conf
input {beats {port 5044#codec json}
}
filter {#将nginx日志格式化为json格式 grok {match {message %{COMBINEDAPACHELOG}}}#以上面提取clientip字段为源,获取地域信息geoip {#source clientip #7.X版本指定源IP的所在字段source [source][address] #8.X版本变化target geoip}
}
output {stdout {codec rubydebug}
}
数据展示
[rootlogstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/beats_geoip_stdout.conf -r{user_agent {original Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36},message 58.250.250.21 - - [14/Jul/2020:15:07:27 0800] \GET /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3 HTTP/1.1\ 200 330 \http://www.wangxiaochun.com/?p117\ \Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\ \-\,geoip {geo {city_name Shenzhen,region_name Guangdong,continent_code AS,location {lat 22.5559,lon 114.0577},country_iso_code CN,region_iso_code CN-GD,country_name China,timezone Asia/Shanghai},ip 58.250.250.21},input {type log},timestamp 2025-01-03T08:14:38.824Z,source {address 58.250.250.21},version 1,url {original /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3},timestamp 14/Jul/2020:15:07:27 0800,http {request {method GET,referrer http://www.wangxiaochun.com/?p117},version 1.1,response {body {bytes 330},status_code 200}},tags [[0] nginx-access,[1] beats_input_codec_plain_applied],event {original 58.250.250.21 - - [14/Jul/2020:15:07:27 0800] \GET /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3 HTTP/1.1\ 200 330 \http://www.wangxiaochun.com/?p117\ \Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\ \-\},host {name kibana},ecs {version 8.0.0},log {offset 623,file {path /var/log/nginx/access.log}},agent {name kibana,id a3acb99e-b483-4367-a2df-535d8a39a0fa,version 8.8.2,ephemeral_id 5d8aad32-46e7-4500-8fa5-d18dd314f8d2,type filebeat}
}Date 插件
Date插件可以将日志中的指定的日期字符串对应的源字段生成新的目标字段。
然后替换timestamp 字段(此字段默认为当前写入logstash的时间而非日志本身的时间)或指定的其他 字段
match #类型为数组用于指定需要使用的源字段名和对应的时间格式
target #类型为字符串用于指定生成的目标字段名默认是 timestamp
timezone #类型为字符串用于指定时区域官方说明
https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html时区格式参考
http://joda-time.sourceforge.net/timezones.html范例: 利用源字段timestamp生成新的字段名access_time
[rootlogstash ~]#cat /etc/logstash/conf.d/http_grok_date_stdout.conf
input {http {port 6666}
}
filter {#将nginx日志格式化为json格式grok {match {message %{COMBINEDAPACHELOG}}}#解析源字段timestamp的date日期格式: 14/Jul/2020:15:07:27 0800date {match [timestamp, dd/MMM/yyyy:HH:mm:ss Z ]#target access_time #将时间写入新生成的access_time字段源字段仍保留target timestamp #将时间覆盖原有的timestamp字段timezone Asia/Shanghai}
}
output { stdout {codec rubydebug}
}数据展示
[rootlogstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_date_stdout.conf -r
{timestamp 2020-07-14T07:07:27.000Z,message 58.250.250.21 - - [14/Jul/2020:15:07:27 0800] \GET /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3 HTTP/1.1\ 200 330 \http://www.wangxiaochun.com/?p117\ \Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\ \-\,url {domain 10.0.0.180,path /,original /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3,port 6666},event {original 58.250.250.21 - - [14/Jul/2020:15:07:27 0800] \GET /wpcontent/plugins/akismet/_inc/form.js?ver4.1.3 HTTP/1.1\ 200 330 \http://www.wangxiaochun.com/?p117\ \Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36\ \-\},user_agent {original [[0] curl/7.81.0,[1] Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36]},host {ip 10.0.0.180},http {version [[0] HTTP/1.1,[1] 1.1],method POST,request {body {bytes 274},method GET,referrer http://www.wangxiaochun.com/?p117,mime_type application/x-www-form-urlencoded},response {body {bytes 330},status_code 200}},source {address 58.250.250.21},timestamp 14/Jul/2020:15:07:27 0800,version 1
}范例: 将UNIX时间转换指定格式
date {match [timestamp,UNIX,YYYY-MM-dd HH:mm:ss]target timestamptimezone Asia/shanghai
}Useragent 插件
useragent 插件可以根据请求中的 user-agent 字段解析出浏览器设备、操作系统等信息, 以方便后续 的分析使用
范例:
[rootlogstash ~]#cat /etc/logstash/conf.d/http_grok_useragent_stdout.conf
input {http {port 6666}
}
filter {#将nginx日志格式化为json格式grok {match {message %{COMBINEDAPACHELOG}}}#解析date日期如: 10/Dec/2020:10:40:10 0800date {match [timestamp, dd/MMM/yyyy:HH:mm:ss Z ]target timestamp #将时间覆盖原有的timestamp字段#target access_time #将时间写入新生成的access_time字段源字段仍保留timezone Asia/Shanghai}#提取agent字段进行解析useragent {#source agent #7,X指定从哪个字段获取数据source message #8.X指定从哪个字段获取数据#source [user_agent][original] #8.X指定从哪个字段获取数据target useragent #指定生成新的字典类型的字段的名称包括osdevice等内容}}
output {stdout {codec rubydebug}
}数据展示
[rootlogstash]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_useragent_stdout.conf -r
{user_agent {original [[0] curl/7.81.0,[1] Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1]},message 10.0.0.1 - - [03/Jan/2025:16:58:13 0800] \GET / HTTP/1.1\ 304 0 \-\ \Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\,useragent {name Mobile Safari,device {name iPad},version 16.6,os {name iOS,version 16.6,full iOS 16.6}},url {domain 10.0.0.180,path /,original /,port 6666},source {address 10.0.0.1},http {version [[0] HTTP/1.1,[1] 1.1],method POST,response {status_code 304,body {bytes 0}},request {method GET,mime_type application/x-www-form-urlencoded,body {bytes 197}}},version 1,timestamp 2025-01-03T08:58:13.000Z,event {original 10.0.0.1 - - [03/Jan/2025:16:58:13 0800] \GET / HTTP/1.1\ 304 0 \-\ \Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\},host {ip 10.0.0.180},timestamp 03/Jan/2025:16:58:13 0800
}Mutate 插件
官方链接: https://www.elastic.co/guide/en/logstash/master/plugins-filters-mutate.htmlhttps://www.elastic.co/guide/en/logstash/7.6/plugins-filters-mutate.htmlMutate 插件主要是对字段进行、类型转换、删除、替换、更新等操作,可以使用以下函数
remove_field #删除字段
split #字符串切割,相当于awk取列
add_field #添加字段
convert #类型转换支持的数据类型integerinteger_eufloatfloat_eustringboolean
gsub #字符串替换
rename #字符串改名
lowercase #转换字符串为小写remove_field 删除字段
范例:
[rootlogstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_remove_field_stdout.conf
input {http {port 6666}
}filter {#将nginx日志格式化为json格式grok {match {message %{COMBINEDAPACHELOG}}}#解析date日期如: 10/Dec/2020:10:40:10 0800date {match [timestamp, dd/MMM/yyyy:HH:mm:ss Z ]target timestamp#target access_timetimezone Asia/Shanghai}#mutate 删除指定字段的操作mutate {#remove_field [headers,message, agent] #7.Xremove_field [timestamp,message, http] #8.X}
}
output {stdout {codec rubydebug}
}数据展示
[rootlogstash]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_remove_field_stdout.conf -r
{event {original 10.0.0.1 - - [03/Jan/2025:16:58:13 0800] \GET / HTTP/1.1\ 304 0 \-\ \Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1\},url {domain 10.0.0.180,path /,original /,port 6666},timestamp 2025-01-03T08:58:13.000Z,user_agent {original [[0] curl/7.81.0,[1] Mozilla/5.0 (iPad; CPU OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1]},host {ip 10.0.0.180},source {address 10.0.0.1},version 1
}
Split 切割
mutate 中的 split 字符串切割指定字符做为分隔符切割的结果用于生成新的列表元素
示例: 1000|提交订单|2020-01-08 09:10:21
范例: split 切割字符串取列
[rootlogstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_split_stdout.conf
input {http {port 6666}
}
filter {#mutate 切割操作mutate {#字段分隔符split { message | } #将message字段按 | 分割成名称message列表中多个列表元素}
}
output {stdout {codec rubydebug}
}数据展示
#启动
[rootlogstash]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_split_stdout.conf
{message [[0] 1000,[1] 提交订单,[2] 2020-01-08 09:10:21],event {original 1000|提交订单|2020-01-08 09:10:21},user_agent {original curl/7.81.0},url {domain 10.0.0.180,path /,port 6666},version 1,host {ip 10.0.0.180},timestamp 2025-01-03T09:14:03.422624536Z,http {version HTTP/1.1,method POST,request {mime_type application/x-www-form-urlencoded,body {bytes 37}}}
}[rootlogstash]#curl -XPOST -d 1000|提交订单|2020-01-08 09:10:21 10.0.0.180:6666/add_field 添加字段
用指定源字段添加新的字段添加完成后源字段还存在
范例:
[rootlogstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_add_field_stdout.conf
input {http {port 6666}
}
filter {#mutate 切割操作mutate {#字段分隔符split { message | }#添加字段,将message的列表的第0个元素添加字段名user_idadd_field {user_id %{[message][0]} action %{[message][1]}time %{[message][2]}}#添加字段做索引名#add_field {[metadata][target_index] app-%{YYY.MM.dd}} #删除无用字段remove_field [headers,message]}
}
output {stdout {codec rubydebug}
}数据展示
#启动
[rootlogstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_add_field_stdout.conf
{url {domain 10.0.0.180,path /,port 6666},user_id 1000,version 1,http {request {body {bytes 37},mime_type application/x-www-form-urlencoded},version HTTP/1.1,method POST},user_agent {original curl/7.81.0},event {original 1000|提交订单|2020-01-08 09:10:21},timestamp 2025-01-03T09:21:45.406866933Z,time 2020-01-08 09:10:21,action 提交订单,host {ip 10.0.0.180}
}#用curl提交日志,可以看到上面输出信息
[rootubuntu2004 ~]#curl -XPOST -d 1000|提交订单|2020-01-08 09:10:21 10.0.0.180:6666/convert 转换
mutate 中的 convert 可以实现数据类型的转换。 支持转换integer、float、string等类型
范例:
[rootlogstash ~]#cat /etc/logstash/conf.d/http_grok_mutate_convert_stdout.conf
input {http {port 6666}
}
filter {#mutate 切割操作mutate {#字段分隔符split { message | }#添加字段add_field {user_id %{[message][0]}action %{[message][1]}time %{[message][2]}}#删除无用字段remove_field [headers,message]#对新添加字段进行格式转换convert {user_id integeraction stringtime string}#convert [excute_time,float] #此格式也可以支持#convert [time,string ]}
}
output {stdout {codec rubydebug}
}
[rootlogstash ~]#/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/http_grok_mutate_convert_stdout.conf -rgsub 替换
gsub 实现字符串的替换
filter {mutate {gsub[message,\n, ] #将message字段中的换行替换为空格}
}条件判断
Filter 语句块中支持 if 条件判断功能
filebeat范例:
#vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.logtags: [access]- type: logenabled: truepaths:- /var/log/nginx/error.logtags: [error]
output.logstash:hosts: [10.0.0.104:5044,10.0.0.105:5044,]#loadbalance: true #负载均衡#worker: 2 #number of hosts * workers #开启多进程logstash配置
#vim /etc/logstash/conf.d/filebeat_logstash_es.conf
input {beats {port 5044}
}
filter {if access in [tags][0] {mutate {add_field { target_index access-%{YYYY.MM.dd}}}}else if error in [tags][0] {mutate {add_field { target_index error-%{YYYY.MM.dd}}}}else if system in [tags][0] {mutate {add_field { target_index system-%{YYYY.MM.dd}}}}}
output {elasticsearch {hosts [10.0.0.181:9200,10.0.0.182:9200,10.0.0.183:9200] #一般写data地址index %{[target_index]} #使用字段target_index值做为索引名template_overwrite true #覆盖索引模板 }
}范例:
#vim /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/nginx/access.logfields:project: test-accessenv: test
output.logstash:hosts: [10.0.0.104:5044,10.0.0.105:5044,] #vim /etc/logstash/conf.d/filebeat_logstash_es.conf
input {beats {port 5044}file {path /tmp/wang.logtype wanglog #自定义的类型,可以用于条件判断start_position beginningstat_interval 3 }}
output {if [fields][env] test {elasticsearch {hosts [10.0.0.101:9200,10.0.0.102:9200,10.0.0.103:9200] index test-nginx-%{YYYY.MM.dd} }}if [type] wanglog {stdout {codec rubydebug}}}