Logstash 教程
1.简介
Logstash是一个数据同步工具,在ELK(Elasticsearch + Logstash + Kibana)技术栈中解决数据同步问题。日常项目中数据主要存储在MYSQL、日志文件中,通过Logstash可以将MYSQL、日志文件、redis等多种数据源的数据同步到ES,这样就可以通过ES搜索数据。
MYSQL同步数据到Elasticsearch,主要有下面几种策略:
双写策略,更新MYSQL数据的同时通过ES API直接写入数据到ES (同步方式)
通过Logstash同步数据到ES (异步方式)
通过订阅MYSQL Binlog,将数据同步到ES (异步方式)
这里主要介绍Logstash如何同步数据。
2.安装
2.1.环境依赖
依赖Java 8 或者 Java 11环境,可以是更高的版本。
2.2.安装方式
2.2.1. centos
更新key
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
创建文件 /etc/yum.repos.d/logstash.repo 内容如下
[logstash-7.x]name=Elastic repository for 7.x packagesbaseurl=https://artifacts.elastic.co/packages/7.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=1autorefresh=1type=rpm-md
安装Logstash
sudo yum install logstash
2.2.2. ubuntu
按顺序执行下面命令
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -sudo apt-get install apt-transport-httpsecho "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.listsudo apt-get update && sudo apt-get install logstash
2.2.3. 通过压缩包安装
通过下面地址下载最新版本的压缩包(linux/mac系统下载tar.gz, windows下载zip)
https://www.elastic.co/cn/downloads/logstash
将压缩包解压到自定义目录即可。
linux系统例子:
tar -zxvf logstash-7.7.1.tar.gz
3.测试安装
下面验证logstash安装是否成功
# 切换到安装目录cd logstash-7.7.1# 执行命令bin/logstash -e 'input { stdin { } } output { stdout {} }'
等一会,logstash启动后在控制台输入tizi365.com 按回车,可以看到类似下面的输出
tizi365.com{ "@timestamp" => 2020-06-09T15:45:38.147Z, "message" => "tizi365.com", "@version" => "1", "host" => "jogindembp"}
添加config.reload.automatic命令参数,自动加载配置,不需要重新启动logstash
bin/logstash -f tizi.conf --config.reload.automatic
4.配置文件
可以将Logstash的配置都写入一个配置文件中,下面是配置文件的格式,主要有三部分组成
# 输入插件配置, 主要配置需要同步的数据源,例如:MYSQLinput {}# 过滤器插件配置, 主要用于对输入的数据进行过滤,格式化操作,filter是可选的。filter { }# 输出插件配置,主要配置同步数据的目的地,例如同步到ESoutput {}
提示:logstash的input、filter、output都是由各种插件组成。
例子:
创建一个tizi.conf配置文件,内容如下:
input { stdin {}}output { stdout { codec => rubydebug }}
说明:
这个配置文件的意思是,从控制台标准输入(stdin)接收输入,然后直接将结果在控制台标准输出(stdout)打印出来。
通过配置文件启动logstash
bin/logstash -f tizi.conf
5.同步nginx日志到ES
下面是将Nginx的访问日志同步到ES中的配置
配置文件名:tizi.conf
input { # 实时监控日志文件的内容,类似tail -f 命令的作用 file { # nginx日志文件路径 path => [ "/data/nginx/logs/nginx_access.log" ] start_position => "beginning" ignore_older => 0 }}# 配置过滤器对日志文件进行格式化filter { # 使用grok插件对日志内容进行格式化,提取日志内容,方便转换成json格式 # %COMBINEDAPACHELOG 是grok插件内置的apache日志内容处理模板,其实就是一些表达式,用来格式日志文本内容,也可以格式化Nginx日志 grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }}# 配置输出目的地,这里配置同步到ES中output { elasticsearch { # es服务器地址 hosts => ["127.0.0.1:9200"] # 目标索引 index => "nginx-access" }}
启动logstash
bin/logstash -f tizi.conf
Logstash 工作原理
Logstash同步数据,主要有三个核心环节:inputs → filters → outputs,流程如下图。
inputs模块负责收集数据,filters模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。
提示:inputs/filters/outputs是通过插件机制扩展各种能力。
inputs
inputs可以收集多种数据源的数据,下面是常见的数据源:
file - 扫描磁盘中的文件数据,例如: 扫描日志文件。
mysql - 扫描Mysql的表数据
redis
Filebeat - 轻量级的文件数据采集器,可以取代file的能力。
消息队列kafka、rabbitmq等 - 支持从各种消息队列读取数据。
filters
filters是一个可选模块,可以在数据同步到目的地之前,对数据进行一些格式化、过滤、简单的数据处理操作。
常用的filters功能:
grok - 功能强大文本处理插件,主要用于格式化文本内容。
drop - 丢弃一些数据
outputs
Logstatsh的最后一个处理节点,outputs负责将数据同步到目的地。
下面是常见的目的地:
elasticsearch
file - 也可以将数据同步到一个文件中
Codecs
codecs就是编码器,负责对数据进行序列号处理,主要就是json和文本两种编码器。
Logstash - 同步MYSQL数据到Elasticsearch
在实际项目场景中,业务数据主流的存储方案还是MYSQL,但是MYSQL处理海量数据的搜索能力较差,目前MYSQL搭配ES,为业务提供强大的数据搜索能力是业界主流的方案,因此需要解决如何将MYSQL中的数据导入到ES中,下面介绍通过Logstash准实时的将MYSQL数据导入到ES中。
1.jdbc插件介绍
Logstash通过jdbc input插件实现定时同步MYSQL数据,了解JAVA的同学应该对jdbc不陌生,就是访问数据库的API标准,我们常见的数据库都可以使用jdbc接口进行访问。
使用jdbc访问数据库,通常都需要安装对应数据库的jdbc驱动,例如:MYSQL的jdbc驱动,到MYSQL官网下载对应的jar包就可以。
MYSQL jdbc驱动下载地址:
https://mvnrepository.com/artifact/mysql/mysql-connector-java
找到MYSQL对应的版本下载JAR包即可,例如下面下载8.0.15版本。
2.简单的同步例子
关键配置有两点:
配置input jdbc输入插件
配置output elasticsearch输出插件
完整的配置如下
input { # 配置JDBC数据源 jdbc { # mysql jdbc驱动路径 jdbc_driver_library => "/Users/tizi365/.m2/repository/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar" # mysql jdbc驱动类 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # MYSQL连接地址,格式: jdbc:mysql://服务器地址:端口/数据库名 jdbc_connection_string => "jdbc:mysql://localhost:3306/wordpress" # •MYSQL 账号 jdbc_user => "root" # MYSQL 密码 jdbc_password => "123456" # 定时任务配置,下面表示每分钟执行一次SQL # 具体语法请参考下一个章节内容 schedule => "* * * * *" # 定时执行的SQL语句,Logstash会根据schedule配置,定时执行这里的SQL语句 # 将SQL语句查询的结果,传给output插件 statement => "SELECT * FROM `wp_posts`" }}output { stdout { # 配置将数据导入到ES中 elasticsearch { # 索引名,logstash会将数据导入到这个索引中 index => "wp_posts" # ES服务器地址,支持多个地址 hosts => ["127.0.0.1:9200","127.0.0.2:9200"] # 设置ES文档的唯一Id值为SQL语句返回的id # 建议将document_id设置为MYSQL表的主键 document_id => "%{id}" } }}
3.定时任务配置
jdbc schedule的配置规则,类似linux的crontab的写法,具体语法规则如下:
语法格式,总共由5个字段组成,含义如下:
* * * * * 分 时 天 月 星期
各个字段取值范围:
分 - 0-59
时 - 0-23
天 - 1-31
月 - 1-12
星期 - 0-7
特殊字符含义:
星号() :代表所有值,例如:第一个字段是星号(),则代表每分钟。
逗号(,):指定一个数值范围,例如:1,2,3,4
横杠(-):另外一种表示一个整数范围的方法,例如:1-4 表示1,2,3,4
斜线(/):可以用斜线指定时间的间隔频率,例如:*/5,如果用在分钟字段,表示每5分钟执行一次。
例子:
# 每分钟执行一次
* * * * *
# 每10分钟执行一次
*/10 * * * *
# 每小时执行一次
* */1 * * *
# 每天0点执行一次
0 0 * * *
# 每天凌晨2点1分执行一次
1 2 * * *
4.增量同步数据
前面的例子同步数据的SQL如下:
input {
# 配置JDBC数据源
jdbc {
# 忽略其他配置
statement => "SELECT * FROM `wp_posts`"
}
}
同步数据的SQL语句,直接扫描全表的数据,如果数据量比较小,问题不大,如果数据量比较大,会直接卡死,logstash OOM挂了,因此需要实现增量同步,每次仅同步新增的数据。
Logstash提供了sql_last_value字段值,帮助我们实现增量同步;增量同步的核心思路就是,logstash每次执行SQL的时候,会将SQL查询结果的最后一条记录的某个值保存到sql_last_value字段中,下一次执行SQL的时候,以sql_last_value值作为参考,从这个值往后查询新数据。
例子:
input {
jdbc {
# 注意where条件id > :sql_last_value
# 每次执行SQL的时候,id大于sql_last_value的值
statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"
# 允许sql_last_value的值来自查询结果的某个字段值。
use_column_value => true
# sql_last_value的值来自查询结果中的最后一个id值
tracking_column => "id"
# ... 忽略其他配置
}
}
说明:
sql_last_value的默认值是0或者1970-01-01,具体是什么值跟数据类型有关,上面的例子,定时任务执行SQL如下
# 第一次执行,sql_last_value=0SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 0# 第二次执行,sql_last_value=100,假设上面的SQL最后的id值是100SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 100# 第三次执行,sql_last_value=200,,假设上面的SQL最后的id值是200SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 200
提示:
上面的例子,使用id作为增量同步数据的依据,不一定适合所有的业务场景,例如:同步文章数据,文章更新了,但是文章的id没有更新,这个时候使用id作为增量同步的依据,会导致更新的文章没有同步到ES,这种场景适合使用更新时间作为增量同步的依据,用法一样,sql_last_value换一个字段值即可。
5.分页
前面的章节在实现增量同步的时候,也存在一个问题,如果增量同步的数据太多的时候,logstash也会卡死,尤其是首次增量同步,例如:一个MYSQL表的数据有100万,首次增量同步数据,会扫描全表的数据。
logstash jdbc插件执行分页查询,避免一次查询太多数据,配置如下:
input { jdbc { # 激活分页处理 jdbc_paging_enabled => true # 分页大小,每次查询1000条数据 jdbc_page_size => 1000 # sql语句 statement => "SELECT * FROM my_table" # ... 忽略其他配置 }}
6.大表同步
在实际业务场景中,有些数据表的数据会有几百万,甚至上亿的数据,那么在使用logstash同步这些大表数据的时候,结合前面两个章节的增量同步和分页处理就可以解决,不过需要注意深度分页的性能问题。
例如:
# 每次查询1000条数据,但是翻页从第500万条数据偏移开始
SELECT * FROM my_table limit 5000000, 1000
这条SQL会非常慢,可以借助索引覆盖优化性能。
例子:
SELECT * FROM my_table WHERE id in (SELECT id FROM my_table limit 5000000, 1000)
因为id是主键,在主键索引中已经包含id的值,不需要回表扫描磁盘的数据,所以性能比较好,上面的SQL首先借助索引覆盖将id值查询出来,然后根据id查询具体的数据。
Filebeat 教程
logstash虽然也支持从磁盘文件中收集数据,但是logstash自己本身还是比较重,对资源的消耗也比较大,尤其是在容器化环境,每个容器都部署logstash也太浪费资源,因此出现了轻量级的日志文件数据收集方案Filebeat,Filebeat将收集到的文件数据传给Logstatsh处理即可。
Filebeat部署架构
可以在每一台服务器或者每一个容器中安装Filebeat,Filebeat负责收集日志数据,然后将日志数据交给Logstash处理,Logstash在将数据导入ES。
安装Filebeat
下载安装包,然后解压即可。
官网下载地址:
https://www.elastic.co/cn/downloads/beats/filebeat
下面以7.7.1版本为例
mac
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.1-darwin-x86_64.tar.gz
tar xzvf filebeat-7.7.1-darwin-x86_64.tar.gz
linux
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.1-linux-x86_64.tar.gz
tar xzvf filebeat-7.7.1-linux-x86_64.tar.gz
Filebeat配置
Filebeat的配置结构类似Logstash,也需要配置input和output,分别配置输入和输出,Filebeat使用yaml格式编写配置文件。
默认配置文件路径:
${安装目录}/filebeat.yml
/etc/filebeat/filebeat.yml
/usr/share/filebeat/filebeat.yml
因为我们使用的是tar安装包安装,所以选择${安装目录}/filebeat.yml 路径。
配置例子:
# 配置采集数据源
filebeat.inputs:
- type: log
paths:
- /var/log/messages
- /var/log/*.log
# 配置输出目标,这里将数据投递给logstash
output.logstash:
# logstash地址
hosts: ["127.0.0.1:5044"]
说明:
type为log类型,表示收集日志文件数据,paths是一个文件路径数组,这里扫描/var/log/messages文件和/var/log/目录下所有以log为扩展名的日志文件。
Logstash beat配置
配置Logstash的input,让Logstash可以接收Filebeat投递过来的数据。
input { # 配置接收Filebeat数据源,监听端口为5044 # Filebeat的output.logstash地址保持跟这里一致 beats { port => 5044 }}output { # 将数据导入到ES中 elasticsearch { hosts => ["http://localhost:9200"] index => "tizi365" }}
启动Filebeat
进入filebeat安装目录
./filebeat -c filebeat.yml
如果配置PATH,直接启动即可。
input插件
Logstash Beats插件
Beats input插件让Logstash可以接收来自Elastic Beats framework发送过来的数据,Elastic Beats framework用的比较多的就是Filebeat.
例子
input { # 在5044端口监听来自beats框架的数据 beats { port => 5044 }}output { elasticsearch { hosts => ["http://localhost:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}" }}
接收来自beats的数据,并且将数据导入到ES中。
Beats Input插件参数
参数名类型默认值说明
host
string
0.0.0.0
监听地址
port
number
无
监听端口
Logstash File input插件
Logstash的file input插件可以实现从磁盘文件中采集数据,通常用于收集日志文件数据,file input插件一行行的从文件中读取数据,然后交给logstash。
提示: file input插件的作用跟linux命令tail -f 的作用类似,可以实时收集文件的最新数据。
例子
input {
# 扫描指定文件日志数据
file {
# 指定需要扫描的日志文件,支持多个文件,也支持星号(*)通配符
# 含义:扫描/var/log/messages文件和/var/log/目录下的所有以log为扩展名的日志文件。
path => [ "/var/log/messages", "/var/log/*.log" ]
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "tizi365"
}
}
file Input插件参数
参数名类型默认值说明
path
array
无
需要扫描的文件路径,数组格式:[ "/var/log/messages", "/var/log/tizi.log" ]
delimiter
string
\n
指定文件换行符
exclude
string
无
指定需要排除的文件,例如排除压缩包:*.gz , 这个参数通常在path参数包含通配符的时候,一起配合使用
Logstash Exec input插件
Exec input插件可以定时的执行一个命令,然后采集命令输出的结果,通过exec插件,我们可以轻松的采集linux系统状态,例如:定时的采集linux服务的内存使用情况。
例子:
input { # 通过exec插件,定时的通过命令 exec { # 需要执行的命令 command => "free -m" # 30秒执行一次 interval => 30 }}output { elasticsearch { hosts => ["http://localhost:9200"] index => "tizi365" }}
说明:
30秒执行一次free -m命令,命令输出的结果,会被Logstash同步到ES中。
exec Input插件参数
参数名类型默认值说明
command
string
无
设置需要执行的命令
interval
number
无
单位是秒,多长时间执行一次命令
schedule
string
无
使用类型linux crontab的语法,设置定时任务,例如:/10 * * * 代表每10分钟跑一次,interval和schedule参数二选一即可
Logstash jdbc input插件
jdbc插件用于解决Logstash采集数据库数据问题,基本上所有的关系数据库都支持jdbc接口,例如: MYSQL、Oracle等。
jdbc插件通过定时任务,定时的执行SQL语句,从数据库中读取数据,定时任务语法类似linux的crontab的写法。
例子
input {
# 配置jdbc数据源
jdbc {
# 指定jdbc驱动路径
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
# jdbc驱动类
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 数据库连接配置
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
# 数据库账号
jdbc_user => "mysql"
# 数据库密码
jdbc_password => "123456"
# SQL绑定的参数
parameters => { "favorite_artist" => "Beethoven" }
# 定时任务配置
schedule => "* * * * *"
# SQL语句
statement => "SELECT * from songs where artist = :favorite_artist"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "tizi365"
}
}
详细的例子可以参考:同步MYSQL数据到Elasticsearch
jdbc Input插件参数
参数名类型默认值说明
jdbc_driver_library
string
指定jdbc驱动路径, 不同数据库jdbc驱动不一样
jdbc_driver_class
string
jdbc驱动类,新版的MYSQL驱动类为:com.mysql.cj.jdbc.Driver
jdbc_connection_string
string
数据库连接配置, 格式: jdbc:数据库类型://地址:端口/数据库,例子:jdbc:mysql://localhost:3306/mydb
jdbc_user
string
数据库账号
jdbc_password
string
数据库密码
schedule
string
定时任务配置,语法可以参考linux的cron
statement
string
需要执行的SQL语句
parameters
hash
SQL绑定参数,例子:{ "target_id" => "321" }
use_column_value
boolean
false
当设置为true时,使用tracking_column定义的列作为:sql_last_value的值。当设置为false时,:sql_last_value等于上次执行查询的时间。
tracking_column
string
定义使用SQL查询结果中的哪一个字段值作为sql_last_value的值
jdbc_paging_enabled
boolean
false
激活分页处理
jdbc_page_size
number
100000
分页大小
Logstash kafka input插件
kafka input插件 支持Logstash从kafka消息队列中的topic读取数据。
例子
input {
# 配置kafka数据源
kafka {
# kafka服务器地址,多个地址使用逗号分隔
bootstrap_servers => "localhost:9092"
# 订阅的主题,支持订阅多个主题
topics => ["logstash", "tizi365"]
# 消费者线程数
consumer_threads => 5
# 消费组Id
group_id => "logstash"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "tizi365"
}
}
kafka Input插件参数
参数名类型默认值说明
bootstrap_servers
string
localhost:9092
kafka服务器地址,多个地址使用逗号分隔
topics
array
["logstash"]
订阅的主题,支持订阅多个主题
consumer_threads
number
1
消费者线程数
group_id
string
logstash
消费组Id
fetch_min_bytes
number
一次最少从服务器读取多少字节数据
fetch_max_bytes
number
一次最多从服务器读取多少字节数据
Logstash RabbitMQ input插件
RabbitMQ input插件,支持Logstash通过RabbitMQ消息队列读取数据。
例子:
input { # 配置rabbitmq数据源 rabbitmq { # rabbitmq服务器地址 host => "localhost" # 端口 port => 5672 # RabbitMQ 账号 user => "guest" # RabbitMQ 密码 password => "guest" # 队列名 queue => "tizi365" }}output { elasticsearch { hosts => ["http://localhost:9200"] index => "tizi365" }}
RabbitMQ Input插件参数
参数名类型默认值说明
host
string
rabbitmq服务器地址
port
number
5672
端口
user
string
guest
RabbitMQ 账号
password
string
guest
RabbitMQ 密码
queue
string
队列名
auto_delete
boolean
false
最后一个消费组退出后是否删除消息
prefetch_count
number
256
预加载多少条消息到本地
Logstash redis input插件
redis input插件支持Logstash从redis中读取数据,目前仅支持从redis的list和channels两种数据结构中读取数据。
例子
input {
# 配置redis数据源
redis {
# redis服务器地址
host => "127.0.0.1"
# 端口
port => 6379
# redis 密码, 没有设置密码可以不填
password => "123456"
# 从哪个key读取数据
key => "tizi365_list"
# 设置Key的redis的数据类型
data_type => "list"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "tizi365"
}
}
redis Input插件参数
参数名类型默认值说明
host
string
127.0.0.1
redis服务器地址
port
number
6379
redis服务器端口号
password
string
redis服务密码
key
string
配置logstash从哪个key读取数据
data_type
string
设置Key的redis的数据类型,支持list, channel
db
number
0
redis数据库
threads
number
1
并发线程数
timeout
number
5
redis连接超时时间,单位秒
ssl
boolean
false
是否打开ssl支持
batch_count
number
125
一次批量从redis加载多少条数据
output插件
Logstash Elasticsearch output插件
通过Elasticsearch output插件Logstash可以将采集到的数据导入到Elasticsearh中。
例子:
input {
# 扫描指定文件日志数据
file {
path => [ "/var/log/messages" ]
}
}
output {
# 将数据导入到ES中
elasticsearch {
# ES服务地址
hosts => ["http://localhost:9200"]
# 索引名
index => "tizi365"
}
}
Elasticsearch output插件参数
参数名类型默认值说明
hosts
uri
[//127.0.0.1]
ES服务地址
index
string
logstash-%{+yyyy.MM.dd}
索引名
document_id
string
设置document的id值,通常使用logstash采集数据的某个字段值作为id值,例如:%{id}
user
string
账号
password
string
密码
routing
string
设置ES的路由参数
Logstash Stdout output插件
在调试Logstash调试的时候,可以将Logstash收集到的数据在命令窗口直接打印出来,通过Stdout output插件可以实现将数据打印到标准输出。
简单例子:
input {
# 扫描指定文件日志数据
file {
path => [ "/var/log/messages" ]
}
}
output {
# 将数据直接打印出来
stdout {}
}
指定输出格式:
output {
# 以Json格式将数据直接打印出来
stdout { codec => json }
}
以rubydebug的格式打印数据:
output {
# 以Json格式将数据直接打印出来
stdout { codec => rubydebug }
}
filter插件
Logstash grok filter插件
通过grok filter插件我们可以对文本内容进行格式化处理,提取文本中的内容,并将其转换成json格式,在处理日志内容的时候非常有用。
例子:
例如日志内容如下:
55.3.244.1 GET /index.html 15824 0.043
这条日志内容包含了ip、http请求方法、请求路径、响应内容大小、响应时间,这条日志是一行字符串,我们可以通过grok将其格式化为:client、method、request、bytes、duration这几个字段,然后在保存到elasticsearch中。
logstash配置:
input { # 扫描指定文件日志数据 file { path => [ "/var/log/http.log" ] }}# 配置过滤器插件,对Input收集到的数据进行格式化处理filter { # 通过grok插件,格式化文本内容 grok { # grok参数,这里决定如何对每一行日志进行参数提取 # message 字段的内容就是格式化日志的表达式 match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }}output { # 将数据导入到ES中 elasticsearch { # ES服务地址 hosts => ["http://localhost:9200"] # 索引名 index => "tizi365" }}
通过grok提取的结果如下:
client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043
grok模式语法
grok的提取字符串内容的语法其实就是在正则表达式基础之上进行封装,Logstash grok内置了120种默认表达式,解决很多日常需求,不需要重头编写复杂的正则表达式。
grok表达式语法:
%{模式名:自定义字段名}
说明:
模式名 - 指的就是预先定义好的正则表达式的别名,例如:IP 可以匹配ip内容。
自定义字段名 - 通过模式匹配到内容后,将内容保存到这个自定义的字段中
例子:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
这是上面例子的grok表达式,下面是对表达式的解读:
%{IP:client} - 匹配IP内容,结果保存到client字段
%{WORD:method} - 匹配非空字符串内容,结果保存到method字段
%{URIPATHPARAM:request} - 匹配url路径,结果保存到request字段
%{NUMBER:bytes} - 匹配数字,结果保存到bytes字段
grok filter插件参数
参数名类型默认值说明
match
hash
{}
定义grok的表达式,格式: message => "表达式"
patterns_dir
array
[]
自定义模式配置文件的路径,支持多个路径,例子:["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
grok内置模式
常用模式
表达式标识名称详情匹配例子
USERNAME 或 USER
用户名
由数字、大小写及特殊字符(._-)组成的字符串
1234、Bob、Alex.Wong
EMAILLOCALPART
用户名
首位由大小写字母组成,其他位由数字、大小写及特殊字符(_.+-=:)组成的字符串。注意,国内的QQ纯数字邮箱账号是无法匹配的,需要修改正则
windcoder、windcoder_com、abc-123
EMAILADDRESS
电子邮件
windcoder@abc.com、windcoder_com@gmail.com、abc-123@163.com
HTTPDUSER
Apache服务器的用户
可以是EMAILADDRESS或USERNAME
INT
整数
包括0和正负整数
0、-123、43987
BASE10NUM 或 NUMBER
十进制数字
包括整数和小数
0、18、5.23
BASE16NUM
十六进制数字
整数
0x0045fa2d、-0x3F8709
WORD
字符串
包括数字和大小写字母
String、3529345、ILoveYou
NOTSPACE
不带任何空格的字符串
SPACE
空格字符串
QUOTEDSTRING 或 QS
带引号的字符串
"This is an apple"、'What is your name?'
UUID
标准UUID
550E8400-E29B-11D4-A716-446655440000
MAC
MAC地址
可以是Cisco设备里的MAC地址,也可以是通用或者Windows系统的MAC地址
IP
IP地址
IPv4或IPv6地址
127.0.0.1、FE80:0000:0000:0000:AAAA:0000:00C2:0002
HOSTNAME
IP或者主机名称
HOSTPORT
主机名(IP)+端口
127.0.0.1:3306、api.windcoder.com:8000
PATH
路径
Unix系统或者Windows系统里的路径格式
/usr/local/nginx/sbin/nginx、c:\windows\system32\clr.exe
URIPROTO
URI协议
http、ftp
URIHOST
URI主机
windcoder.com、10.0.0.1:22
URIPATH
URI路径
//windcoder.com/abc/、/api.php
URIPARAM
URI里的GET参数
?a=1&b=2&c=3
URIPATHPARAM
URI路径+GET参数
/windcoder.com/abc/api.php?a=1&b=2&c=3
URI
完整的URI
https://windcoder.com/abc/api.php?a=1&b=2&c=3
LOGLEVEL
Log表达式
Log表达式
Alert、alert、ALERT、Error
日期时间模式
表达式标识名称匹配例子
MONTH
月份名称
Jan、January
MONTHNUM
月份数字
03、9、12
MONTHDAY
日期数字
03、9、31
DAY
星期几名称
Mon、Monday
YEAR
年份数字
HOUR
小时数字
MINUTE
分钟数字
SECOND
秒数字
TIME
时间
00:01:23
DATE_US
美国时间
10-01-1892、10/01/1892/
DATE_EU
欧洲日期格式
01-10-1892、01/10/1882、01.10.1892
ISO8601_TIMEZONE
ISO8601时间格式
+10:23、-1023
TIMESTAMP_ISO8601
ISO8601时间戳格式
2016-07-03T00:34:06+08:00
DATE
日期
美国日期%{DATE_US}或者欧洲日期%{DATE_EU} |
DATESTAMP
完整日期+时间
07-03-2016 00:34:06
HTTPDATE
http默认日期格式
03/Jul/2016:00:36:53 +0800
Grok自带的模式,具体的规则可以参考下面链接
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
自定义模式
如果grok内置的模式无法满足需求,也可以自定义模式。
模式定义语法:
NAME PATTERN
说明:
NAME - 模式名
PATTERN - 表达式,包括正则表达式和logstash变量。
例子:
步骤1:
配置文件路径:/opt/logstash/tizi_patterns ,文件内容如下
TIZI_NUMBER \d+
提示:自定义模式配置文件路径,可以根据项目情况自定义即可
步骤2:
在logstash配置文件中引用自定义表达式
filter {
grok {
# 指定自定义模式路径
patterns_dir => ["/opt/logstash/tizi_patterns"]
# 使用自定义模式
match => { "message" => "%{TIZI_NUMBER:tizi_data}" }
}
}
调试grok模式
Kibana支持在线调试grok,如下截图:
Logstash java_uuid filter插件
如果我们想给logstash收集到的每一条数据增加一个唯一id,可以通过java_uuid和uuid两个filter插件实现,他们的区别只是底层实现不同,效果类似。
java_uuid
filter { # java版的uuid生成插件 java_uuid { # 生成的唯一id,保存到target指定的字段 target => "uuid" # 如果target指定的字段已经存在,是否覆盖 overwrite => true } }
uuid
filter { # 定义uuid插件 uuid { # 生成的唯一id,保存到target指定的字段 target => "uuid" # 如果target指定的字段已经存在,是否覆盖 overwrite => true } }
Logstash json filter插件
通常情况,Logstash收集到的数据都会转成json格式,但是默认logstash只是对收集到的格式化数据转成json,如果收到的数据仅仅是一个字符串是不会转换成Json.
例如:
{
"id":20,
"domain": "https://www.tizi365.com",
"data": "{\"type\":1, \"msg\":\"message ok\"}"
}
data字段的内容是一个json字符串,不是格式化的Json格式,如果数据导入到Elasticsearch,data字段也是一个字符串,不是一个Json对象;json filter插件可以解决这种问题。
例子:
filter {
# 定义json插件
json {
# 指定需要转换成json格式的字段
source => "data"
# 指定转换成json的数据,保存到那个字段,如果字段存在会覆盖
target => "data"
# 如果遇到错误的json,是否跳过json filter过滤器
skip_on_invalid_json => true
}
}
json filter格式化数据后,输出如下:
{ "id":20, "domain": "https://www.tizi365.com", "data": { "type":1, "msg":"message ok" }}
Logstash kv filter插件
如果logstash收集到的日志格式是key=value键值对,可以通过kv filter插件对其进行格式化。
例子:
日志内容:
ip=1.2.3.4 error=REFUSED
logstash配置
input { # 扫描指定文件日志数据 file { path => [ "/var/log/http.log" ] }}filter { # 使用kv filter格式化键值对日志内容 kv { }}output { # 将数据直接打印出来 stdout {}}
logstash输出的内容如下:
{ "ip": "1.2.3.4", "error": "REFUSED"}
kv filter插件参数
参数名类型默认值说明
prefix
string
指定key的前缀,例如:arg_
field_split
string
" "
指定两个kv值直接的分隔符,默认是空格,例:field_split => "&" , 通过&分隔键值对
default_keys
hash
设置key的默认值,例:default_keys => [ "from", "logstash@example.com", "to", "default@dev.null" ]
Logstash drop filter插件
drop filter插件主要用于删除logstash收集到的数据,通常配合条件语句一起使用。
提示:logstash是一条一条数据发给filter处理,所以drop filter也是一条数据,一条数据的删除。
例子:
input { # 扫描指定文件日志数据 file { path => [ "/var/log/http.log" ] }}filter { # 如果loglevel字段值等于debug,则删除整条消息 if [loglevel] == "debug" { # 通过drop过滤器删除消息 drop { } }}output { # 将数据直接打印出来 stdout {}}