Ycc365下载-亚洲365bet投注-帕尼尼球星卡FIFA365

Logstash 教程

Logstash 教程 1.简介 Logstash是一个数据同步工具,在ELK(Elasticsearch + Logstash + Kibana)技术栈中解决数据同步问题。日常项目中数据主要存储在MYSQL

Logstash 教程

Logstash 教程

1.简介

Logstash是一个数据同步工具,在ELK(Elasticsearch + Logstash + Kibana)技术栈中解决数据同步问题。日常项目中数据主要存储在MYSQL、日志文件中,通过Logstash可以将MYSQL、日志文件、redis等多种数据源的数据同步到ES,这样就可以通过ES搜索数据。

MYSQL同步数据到Elasticsearch,主要有下面几种策略:

双写策略,更新MYSQL数据的同时通过ES API直接写入数据到ES (同步方式)

通过Logstash同步数据到ES (异步方式)

通过订阅MYSQL Binlog,将数据同步到ES (异步方式)

这里主要介绍Logstash如何同步数据。

2.安装

2.1.环境依赖

依赖Java 8 或者 Java 11环境,可以是更高的版本。

2.2.安装方式

2.2.1. centos

更新key

sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

创建文件 /etc/yum.repos.d/logstash.repo 内容如下

[logstash-7.x]name=Elastic repository for 7.x packagesbaseurl=https://artifacts.elastic.co/packages/7.x/yumgpgcheck=1gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearchenabled=1autorefresh=1type=rpm-md

安装Logstash

sudo yum install logstash

2.2.2. ubuntu

按顺序执行下面命令

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -​sudo apt-get install apt-transport-https​echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list​sudo apt-get update && sudo apt-get install logstash

2.2.3. 通过压缩包安装

通过下面地址下载最新版本的压缩包(linux/mac系统下载tar.gz, windows下载zip)

https://www.elastic.co/cn/downloads/logstash

将压缩包解压到自定义目录即可。

linux系统例子:

tar -zxvf logstash-7.7.1.tar.gz

3.测试安装

下面验证logstash安装是否成功

# 切换到安装目录cd logstash-7.7.1# 执行命令bin/logstash -e 'input { stdin { } } output { stdout {} }'

等一会,logstash启动后在控制台输入tizi365.com 按回车,可以看到类似下面的输出

tizi365.com{ "@timestamp" => 2020-06-09T15:45:38.147Z, "message" => "tizi365.com", "@version" => "1", "host" => "jogindembp"}

添加config.reload.automatic命令参数,自动加载配置,不需要重新启动logstash

bin/logstash -f tizi.conf --config.reload.automatic

4.配置文件

可以将Logstash的配置都写入一个配置文件中,下面是配置文件的格式,主要有三部分组成

# 输入插件配置, 主要配置需要同步的数据源,例如:MYSQLinput {}# 过滤器插件配置, 主要用于对输入的数据进行过滤,格式化操作,filter是可选的。filter { }# 输出插件配置,主要配置同步数据的目的地,例如同步到ESoutput {}

提示:logstash的input、filter、output都是由各种插件组成。

例子:

创建一个tizi.conf配置文件,内容如下:

input { stdin {}}output { stdout { codec => rubydebug }}

说明:

这个配置文件的意思是,从控制台标准输入(stdin)接收输入,然后直接将结果在控制台标准输出(stdout)打印出来。

通过配置文件启动logstash

bin/logstash -f tizi.conf

5.同步nginx日志到ES

下面是将Nginx的访问日志同步到ES中的配置

配置文件名:tizi.conf

input { # 实时监控日志文件的内容,类似tail -f 命令的作用 file { # nginx日志文件路径 path => [ "/data/nginx/logs/nginx_access.log" ] start_position => "beginning" ignore_older => 0 }}# 配置过滤器对日志文件进行格式化filter { # 使用grok插件对日志内容进行格式化,提取日志内容,方便转换成json格式 # %COMBINEDAPACHELOG 是grok插件内置的apache日志内容处理模板,其实就是一些表达式,用来格式日志文本内容,也可以格式化Nginx日志 grok { match => { "message" => "%{COMBINEDAPACHELOG}" } }}# 配置输出目的地,这里配置同步到ES中output { elasticsearch { # es服务器地址 hosts => ["127.0.0.1:9200"] # 目标索引 index => "nginx-access" }}

启动logstash

bin/logstash -f tizi.conf

Logstash 工作原理

Logstash同步数据,主要有三个核心环节:inputs → filters → outputs,流程如下图。

inputs模块负责收集数据,filters模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。

提示:inputs/filters/outputs是通过插件机制扩展各种能力。

inputs

inputs可以收集多种数据源的数据,下面是常见的数据源:

file - 扫描磁盘中的文件数据,例如: 扫描日志文件。

mysql - 扫描Mysql的表数据

redis

Filebeat - 轻量级的文件数据采集器,可以取代file的能力。

消息队列kafka、rabbitmq等 - 支持从各种消息队列读取数据。

filters

filters是一个可选模块,可以在数据同步到目的地之前,对数据进行一些格式化、过滤、简单的数据处理操作。

常用的filters功能:

grok - 功能强大文本处理插件,主要用于格式化文本内容。

drop - 丢弃一些数据

outputs

Logstatsh的最后一个处理节点,outputs负责将数据同步到目的地。

下面是常见的目的地:

elasticsearch

file - 也可以将数据同步到一个文件中

Codecs

codecs就是编码器,负责对数据进行序列号处理,主要就是json和文本两种编码器。

Logstash - 同步MYSQL数据到Elasticsearch

在实际项目场景中,业务数据主流的存储方案还是MYSQL,但是MYSQL处理海量数据的搜索能力较差,目前MYSQL搭配ES,为业务提供强大的数据搜索能力是业界主流的方案,因此需要解决如何将MYSQL中的数据导入到ES中,下面介绍通过Logstash准实时的将MYSQL数据导入到ES中。

1.jdbc插件介绍

Logstash通过jdbc input插件实现定时同步MYSQL数据,了解JAVA的同学应该对jdbc不陌生,就是访问数据库的API标准,我们常见的数据库都可以使用jdbc接口进行访问。

使用jdbc访问数据库,通常都需要安装对应数据库的jdbc驱动,例如:MYSQL的jdbc驱动,到MYSQL官网下载对应的jar包就可以。

MYSQL jdbc驱动下载地址:

https://mvnrepository.com/artifact/mysql/mysql-connector-java

找到MYSQL对应的版本下载JAR包即可,例如下面下载8.0.15版本。

2.简单的同步例子

关键配置有两点:

配置input jdbc输入插件

配置output elasticsearch输出插件

完整的配置如下

input { # 配置JDBC数据源 jdbc { # mysql jdbc驱动路径 jdbc_driver_library => "/Users/tizi365/.m2/repository/mysql/mysql-connector-java/8.0.18/mysql-connector-java-8.0.18.jar" # mysql jdbc驱动类 jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # MYSQL连接地址,格式: jdbc:mysql://服务器地址:端口/数据库名 jdbc_connection_string => "jdbc:mysql://localhost:3306/wordpress" # •MYSQL 账号 jdbc_user => "root" # MYSQL 密码 jdbc_password => "123456" # 定时任务配置,下面表示每分钟执行一次SQL # 具体语法请参考下一个章节内容 schedule => "* * * * *" # 定时执行的SQL语句,Logstash会根据schedule配置,定时执行这里的SQL语句 # 将SQL语句查询的结果,传给output插件 statement => "SELECT * FROM `wp_posts`" }}​output { stdout { # 配置将数据导入到ES中 elasticsearch { # 索引名,logstash会将数据导入到这个索引中 index => "wp_posts" # ES服务器地址,支持多个地址 hosts => ["127.0.0.1:9200","127.0.0.2:9200"] # 设置ES文档的唯一Id值为SQL语句返回的id # 建议将document_id设置为MYSQL表的主键 document_id => "%{id}" } }}

3.定时任务配置

jdbc schedule的配置规则,类似linux的crontab的写法,具体语法规则如下:

语法格式,总共由5个字段组成,含义如下:

* * * * * ​ 分 时 天 月 星期

各个字段取值范围:

分 - 0-59

时 - 0-23

天 - 1-31

月 - 1-12

星期 - 0-7

特殊字符含义:

星号() :代表所有值,例如:第一个字段是星号(),则代表每分钟。

逗号(,):指定一个数值范围,例如:1,2,3,4

横杠(-):另外一种表示一个整数范围的方法,例如:1-4 表示1,2,3,4

斜线(/):可以用斜线指定时间的间隔频率,例如:*/5,如果用在分钟字段,表示每5分钟执行一次。

例子:

# 每分钟执行一次

* * * * *

# 每10分钟执行一次

*/10 * * * *

# 每小时执行一次

* */1 * * *

# 每天0点执行一次

0 0 * * *

# 每天凌晨2点1分执行一次

1 2 * * *

4.增量同步数据

前面的例子同步数据的SQL如下:

input {

# 配置JDBC数据源

jdbc {

# 忽略其他配置

statement => "SELECT * FROM `wp_posts`"

}

}

同步数据的SQL语句,直接扫描全表的数据,如果数据量比较小,问题不大,如果数据量比较大,会直接卡死,logstash OOM挂了,因此需要实现增量同步,每次仅同步新增的数据。

Logstash提供了sql_last_value字段值,帮助我们实现增量同步;增量同步的核心思路就是,logstash每次执行SQL的时候,会将SQL查询结果的最后一条记录的某个值保存到sql_last_value字段中,下一次执行SQL的时候,以sql_last_value值作为参考,从这个值往后查询新数据。

例子:

input {

jdbc {

# 注意where条件id > :sql_last_value

# 每次执行SQL的时候,id大于sql_last_value的值

statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > :sql_last_value"

# 允许sql_last_value的值来自查询结果的某个字段值。

use_column_value => true

# sql_last_value的值来自查询结果中的最后一个id值

tracking_column => "id"

# ... 忽略其他配置

}

}

说明:

sql_last_value的默认值是0或者1970-01-01,具体是什么值跟数据类型有关,上面的例子,定时任务执行SQL如下

# 第一次执行,sql_last_value=0SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 0​# 第二次执行,sql_last_value=100,假设上面的SQL最后的id值是100SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 100​# 第三次执行,sql_last_value=200,,假设上面的SQL最后的id值是200SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE id > 200

提示:

上面的例子,使用id作为增量同步数据的依据,不一定适合所有的业务场景,例如:同步文章数据,文章更新了,但是文章的id没有更新,这个时候使用id作为增量同步的依据,会导致更新的文章没有同步到ES,这种场景适合使用更新时间作为增量同步的依据,用法一样,sql_last_value换一个字段值即可。

5.分页

前面的章节在实现增量同步的时候,也存在一个问题,如果增量同步的数据太多的时候,logstash也会卡死,尤其是首次增量同步,例如:一个MYSQL表的数据有100万,首次增量同步数据,会扫描全表的数据。

logstash jdbc插件执行分页查询,避免一次查询太多数据,配置如下:

input { jdbc { # 激活分页处理 jdbc_paging_enabled => true # 分页大小,每次查询1000条数据 jdbc_page_size => 1000 # sql语句 statement => "SELECT * FROM my_table" # ... 忽略其他配置 }}

6.大表同步

在实际业务场景中,有些数据表的数据会有几百万,甚至上亿的数据,那么在使用logstash同步这些大表数据的时候,结合前面两个章节的增量同步和分页处理就可以解决,不过需要注意深度分页的性能问题。

例如:

# 每次查询1000条数据,但是翻页从第500万条数据偏移开始

SELECT * FROM my_table limit 5000000, 1000

这条SQL会非常慢,可以借助索引覆盖优化性能。

例子:

SELECT * FROM my_table WHERE id in (SELECT id FROM my_table limit 5000000, 1000)

因为id是主键,在主键索引中已经包含id的值,不需要回表扫描磁盘的数据,所以性能比较好,上面的SQL首先借助索引覆盖将id值查询出来,然后根据id查询具体的数据。

Filebeat 教程

logstash虽然也支持从磁盘文件中收集数据,但是logstash自己本身还是比较重,对资源的消耗也比较大,尤其是在容器化环境,每个容器都部署logstash也太浪费资源,因此出现了轻量级的日志文件数据收集方案Filebeat,Filebeat将收集到的文件数据传给Logstatsh处理即可。

Filebeat部署架构

可以在每一台服务器或者每一个容器中安装Filebeat,Filebeat负责收集日志数据,然后将日志数据交给Logstash处理,Logstash在将数据导入ES。

安装Filebeat

下载安装包,然后解压即可。

官网下载地址:

https://www.elastic.co/cn/downloads/beats/filebeat

下面以7.7.1版本为例

mac

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.1-darwin-x86_64.tar.gz

tar xzvf filebeat-7.7.1-darwin-x86_64.tar.gz

linux

curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.1-linux-x86_64.tar.gz

tar xzvf filebeat-7.7.1-linux-x86_64.tar.gz

Filebeat配置

Filebeat的配置结构类似Logstash,也需要配置input和output,分别配置输入和输出,Filebeat使用yaml格式编写配置文件。

默认配置文件路径:

${安装目录}/filebeat.yml

/etc/filebeat/filebeat.yml

/usr/share/filebeat/filebeat.yml

因为我们使用的是tar安装包安装,所以选择${安装目录}/filebeat.yml 路径。

配置例子:

# 配置采集数据源

filebeat.inputs:

- type: log

paths:

- /var/log/messages

- /var/log/*.log

# 配置输出目标,这里将数据投递给logstash

output.logstash:

# logstash地址

hosts: ["127.0.0.1:5044"]

说明:

type为log类型,表示收集日志文件数据,paths是一个文件路径数组,这里扫描/var/log/messages文件和/var/log/目录下所有以log为扩展名的日志文件。

Logstash beat配置

配置Logstash的input,让Logstash可以接收Filebeat投递过来的数据。

input { # 配置接收Filebeat数据源,监听端口为5044 # Filebeat的output.logstash地址保持跟这里一致 beats { port => 5044 }}​output { # 将数据导入到ES中 elasticsearch { hosts => ["http://localhost:9200"] index => "tizi365" }}

启动Filebeat

进入filebeat安装目录

./filebeat -c filebeat.yml

如果配置PATH,直接启动即可。

input插件

Logstash Beats插件

Beats input插件让Logstash可以接收来自Elastic Beats framework发送过来的数据,Elastic Beats framework用的比较多的就是Filebeat.

例子

input { # 在5044端口监听来自beats框架的数据 beats { port => 5044 }}​output { elasticsearch { hosts => ["http://localhost:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}" }}

接收来自beats的数据,并且将数据导入到ES中。

Beats Input插件参数

参数名类型默认值说明

host

string

0.0.0.0

监听地址

port

number

监听端口

Logstash File input插件

Logstash的file input插件可以实现从磁盘文件中采集数据,通常用于收集日志文件数据,file input插件一行行的从文件中读取数据,然后交给logstash。

提示: file input插件的作用跟linux命令tail -f 的作用类似,可以实时收集文件的最新数据。

例子

input {

# 扫描指定文件日志数据

file {

# 指定需要扫描的日志文件,支持多个文件,也支持星号(*)通配符

# 含义:扫描/var/log/messages文件和/var/log/目录下的所有以log为扩展名的日志文件。

path => [ "/var/log/messages", "/var/log/*.log" ]

}

}

output {

elasticsearch {

hosts => ["http://localhost:9200"]

index => "tizi365"

}

}

file Input插件参数

参数名类型默认值说明

path

array

需要扫描的文件路径,数组格式:[ "/var/log/messages", "/var/log/tizi.log" ]

delimiter

string

\n

指定文件换行符

exclude

string

指定需要排除的文件,例如排除压缩包:*.gz , 这个参数通常在path参数包含通配符的时候,一起配合使用

Logstash Exec input插件

Exec input插件可以定时的执行一个命令,然后采集命令输出的结果,通过exec插件,我们可以轻松的采集linux系统状态,例如:定时的采集linux服务的内存使用情况。

例子:

input { # 通过exec插件,定时的通过命令 exec { # 需要执行的命令 command => "free -m" # 30秒执行一次 interval => 30 }}​output { elasticsearch { hosts => ["http://localhost:9200"] index => "tizi365" }}

说明:

30秒执行一次free -m命令,命令输出的结果,会被Logstash同步到ES中。

exec Input插件参数

参数名类型默认值说明

command

string

设置需要执行的命令

interval

number

单位是秒,多长时间执行一次命令

schedule

string

使用类型linux crontab的语法,设置定时任务,例如:/10 * * * 代表每10分钟跑一次,interval和schedule参数二选一即可

Logstash jdbc input插件

jdbc插件用于解决Logstash采集数据库数据问题,基本上所有的关系数据库都支持jdbc接口,例如: MYSQL、Oracle等。

jdbc插件通过定时任务,定时的执行SQL语句,从数据库中读取数据,定时任务语法类似linux的crontab的写法。

例子

input {

# 配置jdbc数据源

jdbc {

# 指定jdbc驱动路径

jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"

# jdbc驱动类

jdbc_driver_class => "com.mysql.jdbc.Driver"

# 数据库连接配置

jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"

# 数据库账号

jdbc_user => "mysql"

# 数据库密码

jdbc_password => "123456"

# SQL绑定的参数

parameters => { "favorite_artist" => "Beethoven" }

# 定时任务配置

schedule => "* * * * *"

# SQL语句

statement => "SELECT * from songs where artist = :favorite_artist"

}

}

output {

elasticsearch {

hosts => ["http://localhost:9200"]

index => "tizi365"

}

}

详细的例子可以参考:同步MYSQL数据到Elasticsearch

jdbc Input插件参数

参数名类型默认值说明

jdbc_driver_library

string

指定jdbc驱动路径, 不同数据库jdbc驱动不一样

jdbc_driver_class

string

jdbc驱动类,新版的MYSQL驱动类为:com.mysql.cj.jdbc.Driver

jdbc_connection_string

string

数据库连接配置, 格式: jdbc:数据库类型://地址:端口/数据库,例子:jdbc:mysql://localhost:3306/mydb

jdbc_user

string

数据库账号

jdbc_password

string

数据库密码

schedule

string

定时任务配置,语法可以参考linux的cron

statement

string

需要执行的SQL语句

parameters

hash

SQL绑定参数,例子:{ "target_id" => "321" }

use_column_value

boolean

false

当设置为true时,使用tracking_column定义的列作为:sql_last_value的值。当设置为false时,:sql_last_value等于上次执行查询的时间。

tracking_column

string

定义使用SQL查询结果中的哪一个字段值作为sql_last_value的值

jdbc_paging_enabled

boolean

false

激活分页处理

jdbc_page_size

number

100000

分页大小

Logstash kafka input插件

kafka input插件 支持Logstash从kafka消息队列中的topic读取数据。

例子

input {

# 配置kafka数据源

kafka {

# kafka服务器地址,多个地址使用逗号分隔

bootstrap_servers => "localhost:9092"

# 订阅的主题,支持订阅多个主题

topics => ["logstash", "tizi365"]

# 消费者线程数

consumer_threads => 5

# 消费组Id

group_id => "logstash"

}

}

output {

elasticsearch {

hosts => ["http://localhost:9200"]

index => "tizi365"

}

}

kafka Input插件参数

参数名类型默认值说明

bootstrap_servers

string

localhost:9092

kafka服务器地址,多个地址使用逗号分隔

topics

array

["logstash"]

订阅的主题,支持订阅多个主题

consumer_threads

number

1

消费者线程数

group_id

string

logstash

消费组Id

fetch_min_bytes

number

一次最少从服务器读取多少字节数据

fetch_max_bytes

number

一次最多从服务器读取多少字节数据

Logstash RabbitMQ input插件

RabbitMQ input插件,支持Logstash通过RabbitMQ消息队列读取数据。

例子:

input { # 配置rabbitmq数据源 rabbitmq { # rabbitmq服务器地址 host => "localhost" # 端口 port => 5672 # RabbitMQ 账号 user => "guest" # RabbitMQ 密码 password => "guest" # 队列名 queue => "tizi365" }}​output { elasticsearch { hosts => ["http://localhost:9200"] index => "tizi365" }}

RabbitMQ Input插件参数

参数名类型默认值说明

host

string

rabbitmq服务器地址

port

number

5672

端口

user

string

guest

RabbitMQ 账号

password

string

guest

RabbitMQ 密码

queue

string

队列名

auto_delete

boolean

false

最后一个消费组退出后是否删除消息

prefetch_count

number

256

预加载多少条消息到本地

Logstash redis input插件

redis input插件支持Logstash从redis中读取数据,目前仅支持从redis的list和channels两种数据结构中读取数据。

例子

input {

# 配置redis数据源

redis {

# redis服务器地址

host => "127.0.0.1"

# 端口

port => 6379

# redis 密码, 没有设置密码可以不填

password => "123456"

# 从哪个key读取数据

key => "tizi365_list"

# 设置Key的redis的数据类型

data_type => "list"

}

}

output {

elasticsearch {

hosts => ["http://localhost:9200"]

index => "tizi365"

}

}

redis Input插件参数

参数名类型默认值说明

host

string

127.0.0.1

redis服务器地址

port

number

6379

redis服务器端口号

password

string

redis服务密码

key

string

配置logstash从哪个key读取数据

data_type

string

设置Key的redis的数据类型,支持list, channel

db

number

0

redis数据库

threads

number

1

并发线程数

timeout

number

5

redis连接超时时间,单位秒

ssl

boolean

false

是否打开ssl支持

batch_count

number

125

一次批量从redis加载多少条数据

output插件

Logstash Elasticsearch output插件

通过Elasticsearch output插件Logstash可以将采集到的数据导入到Elasticsearh中。

例子:

input {

# 扫描指定文件日志数据

file {

path => [ "/var/log/messages" ]

}

}

output {

# 将数据导入到ES中

elasticsearch {

# ES服务地址

hosts => ["http://localhost:9200"]

# 索引名

index => "tizi365"

}

}

Elasticsearch output插件参数

参数名类型默认值说明

hosts

uri

[//127.0.0.1]

ES服务地址

index

string

logstash-%{+yyyy.MM.dd}

索引名

document_id

string

设置document的id值,通常使用logstash采集数据的某个字段值作为id值,例如:%{id}

user

string

账号

password

string

密码

routing

string

设置ES的路由参数

Logstash Stdout output插件

在调试Logstash调试的时候,可以将Logstash收集到的数据在命令窗口直接打印出来,通过Stdout output插件可以实现将数据打印到标准输出。

简单例子:

input {

# 扫描指定文件日志数据

file {

path => [ "/var/log/messages" ]

}

}

output {

# 将数据直接打印出来

stdout {}

}

指定输出格式:

output {

# 以Json格式将数据直接打印出来

stdout { codec => json }

}

以rubydebug的格式打印数据:

output {

# 以Json格式将数据直接打印出来

stdout { codec => rubydebug }

}

filter插件

Logstash grok filter插件

通过grok filter插件我们可以对文本内容进行格式化处理,提取文本中的内容,并将其转换成json格式,在处理日志内容的时候非常有用。

例子:

例如日志内容如下:

55.3.244.1 GET /index.html 15824 0.043

这条日志内容包含了ip、http请求方法、请求路径、响应内容大小、响应时间,这条日志是一行字符串,我们可以通过grok将其格式化为:client、method、request、bytes、duration这几个字段,然后在保存到elasticsearch中。

logstash配置:

input { # 扫描指定文件日志数据 file { path => [ "/var/log/http.log" ] }}# 配置过滤器插件,对Input收集到的数据进行格式化处理filter { # 通过grok插件,格式化文本内容 grok { # grok参数,这里决定如何对每一行日志进行参数提取 # message 字段的内容就是格式化日志的表达式 match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }}​output { # 将数据导入到ES中 elasticsearch { # ES服务地址 hosts => ["http://localhost:9200"] # 索引名 index => "tizi365" }}

通过grok提取的结果如下:

client: 55.3.244.1

method: GET

request: /index.html

bytes: 15824

duration: 0.043

grok模式语法

grok的提取字符串内容的语法其实就是在正则表达式基础之上进行封装,Logstash grok内置了120种默认表达式,解决很多日常需求,不需要重头编写复杂的正则表达式。

grok表达式语法:

%{模式名:自定义字段名}

说明:

模式名 - 指的就是预先定义好的正则表达式的别名,例如:IP 可以匹配ip内容。

自定义字段名 - 通过模式匹配到内容后,将内容保存到这个自定义的字段中

例子:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

这是上面例子的grok表达式,下面是对表达式的解读:

%{IP:client} - 匹配IP内容,结果保存到client字段

%{WORD:method} - 匹配非空字符串内容,结果保存到method字段

%{URIPATHPARAM:request} - 匹配url路径,结果保存到request字段

%{NUMBER:bytes} - 匹配数字,结果保存到bytes字段

grok filter插件参数

参数名类型默认值说明

match

hash

{}

定义grok的表达式,格式: message => "表达式"

patterns_dir

array

[]

自定义模式配置文件的路径,支持多个路径,例子:["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]

grok内置模式

常用模式

表达式标识名称详情匹配例子

USERNAME 或 USER

用户名

由数字、大小写及特殊字符(._-)组成的字符串

1234、Bob、Alex.Wong

EMAILLOCALPART

用户名

首位由大小写字母组成,其他位由数字、大小写及特殊字符(_.+-=:)组成的字符串。注意,国内的QQ纯数字邮箱账号是无法匹配的,需要修改正则

windcoder、windcoder_com、abc-123

EMAILADDRESS

电子邮件

windcoder@abc.com、windcoder_com@gmail.com、abc-123@163.com

HTTPDUSER

Apache服务器的用户

可以是EMAILADDRESS或USERNAME

INT

整数

包括0和正负整数

0、-123、43987

BASE10NUM 或 NUMBER

十进制数字

包括整数和小数

0、18、5.23

BASE16NUM

十六进制数字

整数

0x0045fa2d、-0x3F8709

WORD

字符串

包括数字和大小写字母

String、3529345、ILoveYou

NOTSPACE

不带任何空格的字符串

SPACE

空格字符串

QUOTEDSTRING 或 QS

带引号的字符串

"This is an apple"、'What is your name?'

UUID

标准UUID

550E8400-E29B-11D4-A716-446655440000

MAC

MAC地址

可以是Cisco设备里的MAC地址,也可以是通用或者Windows系统的MAC地址

IP

IP地址

IPv4或IPv6地址

127.0.0.1、FE80:0000:0000:0000:AAAA:0000:00C2:0002

HOSTNAME

IP或者主机名称

HOSTPORT

主机名(IP)+端口

127.0.0.1:3306、api.windcoder.com:8000

PATH

路径

Unix系统或者Windows系统里的路径格式

/usr/local/nginx/sbin/nginx、c:\windows\system32\clr.exe

URIPROTO

URI协议

http、ftp

URIHOST

URI主机

windcoder.com、10.0.0.1:22

URIPATH

URI路径

//windcoder.com/abc/、/api.php

URIPARAM

URI里的GET参数

?a=1&b=2&c=3

URIPATHPARAM

URI路径+GET参数

/windcoder.com/abc/api.php?a=1&b=2&c=3

URI

完整的URI

https://windcoder.com/abc/api.php?a=1&b=2&c=3

LOGLEVEL

Log表达式

Log表达式

Alert、alert、ALERT、Error

日期时间模式

表达式标识名称匹配例子

MONTH

月份名称

Jan、January

MONTHNUM

月份数字

03、9、12

MONTHDAY

日期数字

03、9、31

DAY

星期几名称

Mon、Monday

YEAR

年份数字

HOUR

小时数字

MINUTE

分钟数字

SECOND

秒数字

TIME

时间

00:01:23

DATE_US

美国时间

10-01-1892、10/01/1892/

DATE_EU

欧洲日期格式

01-10-1892、01/10/1882、01.10.1892

ISO8601_TIMEZONE

ISO8601时间格式

+10:23、-1023

TIMESTAMP_ISO8601

ISO8601时间戳格式

2016-07-03T00:34:06+08:00

DATE

日期

美国日期%{DATE_US}或者欧洲日期%{DATE_EU} |

DATESTAMP

完整日期+时间

07-03-2016 00:34:06

HTTPDATE

http默认日期格式

03/Jul/2016:00:36:53 +0800

Grok自带的模式,具体的规则可以参考下面链接

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

自定义模式

如果grok内置的模式无法满足需求,也可以自定义模式。

模式定义语法:

NAME PATTERN

说明:

NAME - 模式名

PATTERN - 表达式,包括正则表达式和logstash变量。

例子:

步骤1:

配置文件路径:/opt/logstash/tizi_patterns ,文件内容如下

TIZI_NUMBER \d+

提示:自定义模式配置文件路径,可以根据项目情况自定义即可

步骤2:

在logstash配置文件中引用自定义表达式

filter {

grok {

# 指定自定义模式路径

patterns_dir => ["/opt/logstash/tizi_patterns"]

# 使用自定义模式

match => { "message" => "%{TIZI_NUMBER:tizi_data}" }

}

}

调试grok模式

Kibana支持在线调试grok,如下截图:

Logstash java_uuid filter插件

如果我们想给logstash收集到的每一条数据增加一个唯一id,可以通过java_uuid和uuid两个filter插件实现,他们的区别只是底层实现不同,效果类似。

java_uuid

filter { # java版的uuid生成插件 java_uuid { # 生成的唯一id,保存到target指定的字段 target => "uuid" # 如果target指定的字段已经存在,是否覆盖 overwrite => true } }

uuid

filter { # 定义uuid插件 uuid { # 生成的唯一id,保存到target指定的字段 target => "uuid" # 如果target指定的字段已经存在,是否覆盖 overwrite => true } }

Logstash json filter插件

通常情况,Logstash收集到的数据都会转成json格式,但是默认logstash只是对收集到的格式化数据转成json,如果收到的数据仅仅是一个字符串是不会转换成Json.

例如:

{

"id":20,

"domain": "https://www.tizi365.com",

"data": "{\"type\":1, \"msg\":\"message ok\"}"

}

data字段的内容是一个json字符串,不是格式化的Json格式,如果数据导入到Elasticsearch,data字段也是一个字符串,不是一个Json对象;json filter插件可以解决这种问题。

例子:

filter {

# 定义json插件

json {

# 指定需要转换成json格式的字段

source => "data"

# 指定转换成json的数据,保存到那个字段,如果字段存在会覆盖

target => "data"

# 如果遇到错误的json,是否跳过json filter过滤器

skip_on_invalid_json => true

}

}

json filter格式化数据后,输出如下:

{ "id":20, "domain": "https://www.tizi365.com", "data": { "type":1, "msg":"message ok" }}

Logstash kv filter插件

如果logstash收集到的日志格式是key=value键值对,可以通过kv filter插件对其进行格式化。

例子:

日志内容:

ip=1.2.3.4 error=REFUSED

logstash配置

input { # 扫描指定文件日志数据 file { path => [ "/var/log/http.log" ] }}​filter { # 使用kv filter格式化键值对日志内容 kv { }}​output { # 将数据直接打印出来 stdout {}}

logstash输出的内容如下:

{ "ip": "1.2.3.4", "error": "REFUSED"}

kv filter插件参数

参数名类型默认值说明

prefix

string

指定key的前缀,例如:arg_

field_split

string

" "

指定两个kv值直接的分隔符,默认是空格,例:field_split => "&" , 通过&分隔键值对

default_keys

hash

设置key的默认值,例:default_keys => [ "from", "logstash@example.com", "to", "default@dev.null" ]

Logstash drop filter插件

drop filter插件主要用于删除logstash收集到的数据,通常配合条件语句一起使用。

提示:logstash是一条一条数据发给filter处理,所以drop filter也是一条数据,一条数据的删除。

例子:

input { # 扫描指定文件日志数据 file { path => [ "/var/log/http.log" ] }}​filter { # 如果loglevel字段值等于debug,则删除整条消息 if [loglevel] == "debug" { # 通过drop过滤器删除消息 drop { } }}​output { # 将数据直接打印出来 stdout {}}

← 上一篇: 疥癣的解释
下一篇: 宝马mini车质量怎么样? →

相关推荐

windows网络共享网络(Internet连接共享)(ICS:Internet Connection Sharing)让某台主机共享笔记本网络(或笔记本手机热点)实现上网(网线直连笔记本)ics共享
世界杯转播直播间女主持人贺思浓声音叫声是真的吗 抖音回应视频出现喘息声是假的
1000千卡等于多少卡路里(食物)
1、穿越火线连杀判定的间隔
微信改名限制详解:你为什么改不了微信名?
圣诞节折扣淘宝怎么买的?圣诞节打折什么时候开始?
足球——世预赛:中国不敌沙特
一个静态网页我要弄多久
标准吉他有多少品
烟感报警器怎么关闭
街头霸王2下载 中文版软件截图
一座皖北县城的“百年梨事”:砀山如何将“甜蜜”进行到底