My Avatar

Shadow

I love bleak day, like something will happen

Logstash学习

2016年05月19日 星期四, 发表于 北京

如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)

Logstash是一个开放的数据收集引擎,它可以将不同源的数据进行统一并且格式化到你自己定制的目的地。

它最初被设计是用来收集日志的,然而现在它的能力已经被大大扩展,任何类型的event都能通过一组input、filter、output插件的影响来被处理和转移,另外通过codec能更加简化这个过程。

``

Logstash的能力

Logstash收集的数据类型

Logs 和 Metrics

Web数据

数据存储和流

从你已经拥有的数据中发现更多的价值

传感器和物联网

对传感器物联网数据的收集

轻松数据丰富

Logstash在接收数据的过程中,对数据进行清洗和转移,来获得实时的对数据的理解。

选择你的后端仓库

将你的数据路由到最需要它的地方。Logstash支持很多下游分析、操作工具来存储、分析、操作你的数据

安装Logstash

首先确保你安装了java

然后在Logstash installation file中找到匹配你的环境的安装文件

下载->解压->更改配置文件->运行

就是这么简单,如下图:

当然你也可以选择通过Package Repositories安装,具体参考https://www.elastic.co/guide/en/logstash/current/installing-logstash.html

最基本的Logstash样例

因为我是在windows上测试,所以,以下结果仅供参考

为了验证安装完成,我们输入以下指令:

1
2
cd logstash/bin
logstash.bat -e 'input { stdin {} } output { stdout {}}'

-e表示你直接通过命令行指定配置。这个指令,从标准输入中读入数据,然后显示到标准输出,我们的显示结果如下:

我们输入了hello world,而logstash自动添加了时间戳和host信息,然后将消息输出。

配置一个更高级的Logstash流水线

一个Logstash流水线通常会有一到多个input、filter和output插件。

Logstash利用配置文件定义你的Logstash流水线。当你启动一个Logstash实例,用 -f <path/to/file> 选项来指定你所使用的配置文件。

一个Logstash流水线,要求有两个基本的元素,input 和 output,一个可选的元素,filter。input插件从一个源消费数据,filter插件按指定的方式修改数据,output插件将数据写到目的地。

解析Apache Logs到Elasticsearch

接下来的例子将Apache web日志作为input,解析该日志,并创建特定的字段,将解析后的日志写到Elasticsearch集群

设置input

首先,修改配置文件 first-pipeline.conf,使用file input插件

1
2
3
4
5
6
7
input {
	file {
    	path => "D:\DevTools\logstash-all-plugins-2.3.1\logstash-2.3.1\logs\test.1.log"
    	start_position => beginning 
    	ignore_older => 0 
	}
}

start_position => beginning 指定从文件头开始读取文件内容 ignore_older => 0 指定不忽略一天以上未修改的文件

具体含义参考file input插件

设置filter

grok filter插件是几个默认插件之一

grok根据log的pattern来获取你感兴趣的数据,类似正则匹配

假设我们有一条Apache web server的日志如下:

1
2
3
83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

那么我们就可以用grok模式 %{COMBINEDAPACHELOG}来将这条日志自动解析成特定的结构化数据,具体可以参考grok

编辑first-pipeline.conf文件来加入如下配置:

1
2
3
4
5
filter {
	grok {
   		match => { "message" => "%{COMBINEDAPACHELOG}"}
	}
}

那么在经过filter处理完后,上面那条日志应该是以如下JSON格式呈现:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"clientip" : "83.149.9.216",
"ident" : ,
"auth" : ,
"timestamp" : "04/Jan/2015:05:13:42 +0000",
"verb" : "GET",
"request" : "/presentations/logstash-monitorama-2013/images/kibana-search.png",
"httpversion" : "HTTP/1.1",
"response" : "200",
"bytes" : "203023",
"referrer" : "http://semicomplete.com/presentations/logstash-monitorama-2013/",
"agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
}

设置output

最后将数据转存到Elasticsearch,我们只需要编辑first-pipeline.conf,添加如下配置即可:

1
2
3
4
output {
	elasticsearch {
	}
}

如上配置,Logstash会使用http协议去连接本地的Elasticsearch,当然如果你需要指定远程的Elasticsearch实例,可以添加hosts配置,具体参见Elasticsearch output

用Geoip Filter丰富数据

geoip插件能用来通过ip地址推导出地理位置信息,并将地理位置信息加到日志中。

我们只要在first-pipeline.conf中添加如下配置:

1
2
3
geoip {
	source => "clientip"
}

geoip插件要求数据是已经按照不同的字段定义好。所以我们要确保geoip小节是写在grok小节后的。

同时需要制定ip地址的字段名,在我们这个例子中,经过grop解析的ip字段名是clientip(这是%{COMBINEDAPACHELOG}模式已经预先设置好的)

开始测试我们的流水线

经过上面的步骤,我们的first-pipeline.conf变成如下这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
input {
	file {
		path => "D:/DevTools/logstash-all-plugins-2.3.1/logstash-2.3.1/logs/test.1.log"
		start_position => beginning
	}
}
filter {
	grok {
		match => { "message" => "%{COMBINEDAPACHELOG}"}
	}
	geoip {
		source => "clientip"
	}
}
output {
	elasticsearch {}
	stdout {}
}

接着,为了验证配置是否正确,我们可以执行:

1
logstash.bat -f ../conf/first-pipeline.conf --configtest

–configtest选项是用来分析你的config文件,并且报告所有的错误的。 如果配置文件没错,则会返回一个ok

接着我们再执行:

1
logstash.bat -f ../conf/first-pipeline.conf

[注意]

使用file作为input插件时,如果file中只有一行数据,那么Logstash不会有任何输出,不知道这是为什么,只要在后面加一行即可,即使只是回车,另外如果发现配置都没有问题就是没有输出,你可以在input文件中手动添加新的内容,这样就能将新的内容输出出来了

执行完上述命令后,我们可以通过在浏览器中调用:

1
http://localhost:9200/logstash-2016.05.19/_search?q=response=200

来查看elasticsearch中的结果:

{“_index”:”logstash-2016.05.19”,”_type”:”logs”,”_id”:”AVTHnpSDjwEBWaog2sYG”,”_score”:0.13928263,”_source”:{“message”:”83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"\r”,”@version”:”1”,”@timestamp”:”2016-05-19T06:05:38.552Z”,”path”:”D:/DevTools/logstash-all-plugins-2.3.1/logstash-2.3.1/logs/test.1.log”,”host”:”uf201202262”,”clientip”:”83.149.9.216”,”ident”:”-“,”auth”:”-“,”timestamp”:”04/Jan/2015:05:13:42 +0000”,”verb”:”GET”,”request”:”/presentations/logstash-monitorama-2013/images/kibana-search.png”,”httpversion”:”1.1”,”response”:”200”,”bytes”:”203023”,”referrer”:”"http://semicomplete.com/presentations/logstash-monitorama-2013/"”,”agent”:”"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"”,”geoip”:{“ip”:”83.149.9.216”,”country_code2”:”RU”,”country_code3”:”RUS”,”country_name”:”Russian Federation”,”continent_code”:”EU”,”region_name”:”48”,”city_name”:”Moscow”,”latitude”:55.75219999999999,”longitude”:37.6156,”timezone”:”Europe/Moscow”,”real_region_name”:”Moscow City”,”location”:[37.6156,55.75219999999999]}}}

可以看到,确实日志中的各个字段都被解析出来,而且也有geoip信息。

Logstash处理流

Logstash事件处理流有三个阶段:inputs -> filters -> outputs inputs产生事件,filters修改事件,outputs转移事件

常用的各阶段插件类型如下:

Logstash配置

配置文件结构

值类型

每个插件都要求其中的每一项配置都有一个确定的类型,例如boolean或者hash,如下这些值类型是支持的

Array

array可以是一个单独的字符串,也可以是多个字符串,如果你对一项配置指定了多次赋值,那么它会被加到array中

例如:

1
2
path => [ "/var/log/messages", "/var/log/*.log" ]
path => "/data/mysql/mysql.log"

这里的path就是array类型的配置

Boolean

一个boolean类型的值要么是true要么是false,是不需要引号引起来的

例如:

1
ssl_enable => true
Bytes

一个bytes类型,就是一个字符串字段代表了有效单位的字节数

例如:

1
2
3
4
my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB"  # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes
Codec

codec就是Logstash的codec插件的名字,它能被用到inputs和outputs中,input codec用来对进入到input之前的数据进行decode,而output codecs用来对即将离开output的数据进行encode

例如:

1
codec => "json"
Hash

hash是一个key value对的集合

例如:

1
2
3
4
5
match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}
Number

Numbers必须是有效的数字(float或者integer)

例如:

1
port => 33
Password

password是一个字符串,它不会被打印或输出到日志

例如:

1
my_password => "password"
Path

path是一个字符串代表操作系统的路径

例如:

1
2
my_path => "/tmp/logstash"
my_path => "D:/DevTools/test"
String

String是字符序列,它必须用引号引起来,双引号和单引号都行,但是 其中的原义引号必须用反斜线来进行转义。

例如:

1
2
name => "Hello world"
 	name => 'It\'s a beautiful day'
Comments

注释,跟perl、ruby、python中的注释一样,可以不用从行首开始注释

例如:

1
2
3
4
5
# this is a comment

input { # comments can appear at the end of a line, too
  	# ...
}

事件相关配置

所谓事件(events)是指在Logstash中被传输的数据

所有的时间都有属性,例如,一条apache访问日志会包含status code(200,404),request path(“/”,”index.html”),HTTP verb(GET,POST),client IP address等等,Logstash将这些属性称为fields(字段)

Logstash中的一些配置,要求某些字段是存在的。例如我们之前在geoip中的配置的source属性,就是指定的input获得clientip字段。

但是input阶段,我们还没有获得事件,因此是不可能有任何字段存在的,所以在input中绝对不会存在字段引用sprintf formatconditionals,这些只会在filter和output中出现

字段引用

访问某个字段的语法为: [fieldname]

如果你是访问一个外层字段,那么你连[]都可以省略

但是如果要引用一个嵌套的字段,你需要制定完整的路径:[top-level][nested field]

例如:一个事件如下:

1
2
3
4
5
6
7
8
9
10
11
12
{
  "agent": "Mozilla/5.0 (compatible; MSIE 9.0)",
  "ip": "192.168.24.44",
  "request": "/index.html"
  "response": {
	"status": 200,
	"bytes": 52353
  },
  "ua": {
	"os": "Windows 7"
  }
}

它有五个外层字段(agent、ip、request、response、ua),三个嵌套字段(status、bytes、os)

为了引用os字段,你需要指定[ua][os]

sprintf format

我们还可以在Logstash称之为sprintf format的形式中使用字段引用。

这种形式使得你可以在一个字符串中引用字段的值。

例如:

1
2
3
4
5
output {
  statsd {
	increment => "apache.%{[response][status]}"
  }
}

increment就是一个字符串的拼接,后缀是response的状态码

当然你也可以用+FORMAT语法来格式化时间

例如:

1
2
3
4
5
output {
  file {
	path => "/var/log/%{type}.%{+yyyy.MM.dd.HH}"
  }
}

Conditionals

有些时候,你只是想filter或者output一些满足特定条件的事件,那么这时你可以用到conditional

Logstash中的conditionals和编程语言中表现的基本一致,它支持if、else if和else语法,而且可以嵌套

条件语法如下:

1
2
3
4
5
6
7
if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

那么expression是什么呢?他可以是比较、布尔逻辑等等

你可以使用如下这些比较操作符:

布尔操作符为:

同时支持的一元操作符为:

例如

1
2
3
4
5
filter {
  if [action] == "login" {
	mutate { remove_field => "secret" }
  }
}

上面的代码表示,如果事件中的action字段值等于login,则去掉secret字段

再比如:

1
2
3
4
5
output {
  if "_grokparsefailure" not in [tags] {
	elasticsearch { ... }
  }
}

你可以用not in来将哪些grok处理成功的事件转存到elasticsearch中,这样做就可以达到我只取我需要的那部分数据的作用了。

你可以通过 if [foo] 来检测foo字段是否存在在事件中,但是你没有办法区别它到底是不存在,还是说它本身的值是false或空

expression if [foo] 返回false,当:

@metadata字段

在Logstash1.5之后,有一个特殊的字段@metadata

@metadata的内容不会被output出来,因此我们可以用它来做一些工作

首先来看示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
input { stdin { } }

filter {
  mutate { add_field => { "show" => "This data will be in the output" } }
  mutate { add_field => { "[@metadata][test]" => "Hello" } }
  mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
  if [@metadata][test] == "Hello" {
	stdout { codec => rubydebug }
  }
}

这样处理之后的结果如下:

1
2
3
4
5
6
7
8
9
10
$ bin/logstash -f ../test.conf
Logstash startup completed
asdf
{
   "message" => "asdf",
   "@version" => "1",
   "@timestamp" => "2015-03-18T23:09:29.595Z",
   "host" => "example.com",
   "show" => "This data will be in the output"
}

我们输入的asdf变成了message字段,但是@metadata字段的内容都没有输出

但是需要注意的是,rubydebug codec允许你将@metadata的数据输出出来

1
stdout { codec => rubydebug { metadata => true } }

这样之后,output就长这样了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ bin/logstash -f ../test.conf
Logstash startup completed
asdf
{
   	"message" => "asdf",
  	"@version" => "1",
	"@timestamp" => "2015-03-18T23:10:19.859Z",
  	"host" => "example.com",
 	"show" => "This data will be in the output",
 	"@metadata" => {
   		"test" => "Hello",
		"no_show" => "This data will not be in the output"
	}
}

注意:只有rubydebug codec能允许你展示@metadata的内容

一个典型的对@metadata的应用是,去掉多余的timestamp字段,例如一个日志中有timestamp,我们能截取到这个字段,但是Logstash默认会自己添加一个@timestamp字段,这样就会出现重复,如果没有@metadata,我们需要自己去删除不必要的字段,但是有了@metadata,我们可以这么做

1
2
3
4
5
6
7
8
9
10
input { stdin { } }

filter {
  grok { match => [ "message", "%{HTTPDATE:[@metadata][timestamp]}" ] }
  date { match => [ "[@metadata][timestamp]", "dd/MMM/yyyy:HH:mm:ss Z" ] }
}

output {
  stdout { codec => rubydebug }
}

在上面的配置中,我们将时间解析到@metadata的timestamp字段中,这样,最后输出的时候就不会有这个字段,而只有自带的@timestamp字段了

配置中使用环境变量

这个特性还处于实验阶段,为了开启该功能,你需要在启动Logstash时添加 –allow-env参数

配置举例

参见 https://www.elastic.co/guide/en/logstash/current/config-examples.html

重新加载配置文件

从Logstash2.3开始,你可以设置logstash去自动检测并且重载配置文件

只要在启动logstash时添加 –auto-reload(或者 -r)参数

例如:

1
bin/logstash –f apache.config --auto-reload

–auto-reload参数在你用-e参数时是不可用的

默认情况下,Logstash每3秒检查一次配置文件。可以通过–reload-interval 来指定

如果之前logstash并没有开启自动重载配置的功能,那么你只能kill掉原进程,再重新启动

命令行参数

Logstash有如下这些参数,你可以使用–help参数来展示这些信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
-f, --config CONFIGFILE
 Load the Logstash config from a specific file, directory, or a wildcard. If
 given a directory or wildcard, config files will be read from the directory in
 alphabetical order.

-e CONFIGSTRING
 Use the given string as the configuration data. Same syntax as the config file.
 If not input is specified, 'stdin { type => stdin }' is default. If no output
 is specified, 'stdout { codec => rubydebug }}' is default.

-w, --filterworkers COUNT
 Sets the number of pipeline workers (threads) to run for filter and output
 processing (default: number of cores).
 If you find that events are backing up, or that the CPU is not saturated, consider increasing
 this number to better utilize machine processing power.
 NOTE: --filterworkers is deprecated. Please use --pipeline-workers or -w

-b, --pipeline-batch-size SIZE
 This parameter defines the maximum number of events an individual worker thread will collect
 before attempting to execute its filters and outputs. Default is 125 events.
 Larger batch sizes are generally more efficient, but come at the cost of increased memory
 overhead. You may have to increase the JVM heap size by setting the `LS_HEAP_SIZE`
 variable to effectively use the option.

-u, --pipeline-batch-delay DELAY_IN_MS
 When creating pipeline event batches, how long to wait while polling for the next event.
 Default is 5ms.

-l, --log FILE
 Log to a given path. Default is to log to stdout

--verbose
 Increase verbosity to the first level (info), less verbose.

--debug
 Increase verbosity to the last level (trace), more verbose.

--debug-config
 Print the compiled config ruby code out as a debug log (you must also have --debug enabled).
 WARNING: This will include any 'password' options passed to plugin configs as plaintext, and may result
 in plaintext passwords appearing in your logs!

-V, --version
  Display the version of Logstash.

-p, --pluginpath
  A path of where to find plugins. This flag can be given multiple times to include
  multiple paths. Plugins are expected to be in a specific directory hierarchy:
  'PATH/logstash/TYPE/NAME.rb' where TYPE is 'inputs' 'filters', 'outputs' or 'codecs'
  and NAME is the name of the plugin.

-t, --configtest
  Checks configuration and then exit. Note that grok patterns are not checked for
  correctness with this flag.
  Logstash can read multiple config files from a directory. If you combine this
  flag with `--debug`, Logstash will log the combined config file, annotating the
  individual config blocks with the source file it came from.

-r, --[no-]auto-reload
  Monitor configuration changes and reload the configuration whenever it is changed.

--allow-env
  EXPERIMENTAL: Enable environment variable templating within configuration parameters.

--reload-interval RELOAD_INTERVAL
  Specifies how often Logstash checks the config files for changes. The default is every 3 seconds.

-h, --help
  Print help

处理多行事件

在一些特殊的情况下,会产生一些多行的事件。为了正确处理这些多行事件,logstash需要知道哪些行是属于单个事件的一部分

Logstash使用multiline codec来将多行merge到单个input中

有两个重要的配置项来确定处理规则

举例

将java的栈信息整合到一个单一的事件中

java栈信息是由多行组成,如下:

1
2
3
4
Exception in thread "main" java.lang.NullPointerException
	at com.example.myproject.Book.getTitle(Book.java:16)
	at com.example.myproject.Author.getBookTitles(Author.java:25)
	at com.example.myproject.Bootstrap.main(Bootstrap.java:14)

为了将这些行整合成一个单一的事件,可以使用如下配置:

1
2
3
4
5
6
7
8
input {
  stdin {
	codec => multiline {
  		pattern => "^\s"
  		what => "previous"
	}
  }
}

这个配置就告诉我们,将以空格开头的行,附加到之前的行的后面去

部署和扩展Logstash

参考

https://www.elastic.co/guide/en/logstash/current/deploying-and-scaling.html

Flume vs Logstash

之前用过Flume对NC的日志进行收集,不过并没有细究Flume与Logstash的具体区别,以及各自的优劣,下面就简单谈一谈自己对这两者在各方面差异的理解:

数据清洗

Logstash比较强大之处在于有各种各样的filter插件,这些插件将非结构化的数据,转化为结构化的数据,而且还可以从日志中推导出其他信息,删除和添加字段

Flume其实也有这部分的功能,Flume里面叫Interceptors,它支持对event的body进行过滤、匹配,Logstash中的grok filter所做的事通过Regex Extractor Interceptor就能达到相同的目的

但是Logstash的一个特点就是其插件生态系统,有大量官方或开发者提供的灵活的插件可以使用,而Flume官方提供的Interceptors才8个,当然你可以自己去实现一些特定的Interceptor

而且,对于Multiline这样的日志信息,通过Logstash的codec可以很好地处理,而对于Flume就需要费一番功夫了

数据监控与分析

Logstash本就是Elasticsearch家族的产品,自然和Elasticsearch还有Kibana有很好的结合,通过ELK框架,能快速搭建一套完整的日志收集、分析、监控系统。但同时你所能做的分析功能也被局限了,例如我们要用日志做更复杂的数据挖掘,那么这套框架可能没办法提供解决方案。但是简单的监控是完全可行的

Flume是Apache旗下的产品,官方说如果你有一些文本日志想要存到HDFS中,那么Flume是完全适合的。Flume是被设计用来接收和消费在一个相对稳定环境下持续且规律地生成的数据的。Flume本身专注于对数据的收集,如果需要对数据再进行监控和分析,还得再配合其他工具,如利用storm进行实时的监控。

可靠性

我感觉Flume对可靠性的保证做的更全面。

Flume通过可靠的channel类型保证数据不会丢失,也就是那些会将数据持久化到磁盘的channel,如果用memory channel那么数据还是会丢失。

另外,Flume中的event都是保存在channel中,channel中的数据只有被传递到下一个agent的channel或者最终的repository中了才会被删除,它用事务性的方法来保证了event传输的可靠性。

而Logstash并没有提到这方面的保证,在一个logstash实例中,数据应该都是保存在内存,如果实例down掉了,那么整个数据都丢失了。

Logstash在扩展性那一节中提到,利用水平节点扩展和MQ来提供高可用,如下图:

但是这只能保证在单个logstash实例fail之后,系统能正常运行,并不能将其丢失的数据恢复

总结下来,如果你只是想要对日志数据进行各项指标的监控,并且对少量数据丢失不敏感的话,那么推荐logstash

但是如果,只是单纯地收集日志到分布式存储,且不能忍受日志丢失的情况的话,还是推荐用Flume吧