aggregate可以将将属于一个task的多个events的信息聚合在一起,并最终聚合到最后一个events中。使用时要注意一定要将worker数量设置成1(在logstash安装目录下的config目录下,logstash.yml文件,默认的worker数量和cpu的内核数量一致),否则处理顺序错乱会导致未知的结果。

1. 例一:输入有开始和结束标志的

输入日志如下

1
2
3
4
INFO - 12345 - TASK_START - start
INFO - 12345 - SQL - sqlQuery1 - 12
INFO - 12345 - SQL - sqlQuery2 - 34
INFO - 12345 - TASK_END - end

配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
}
if [logger] == "TASK_START" {
aggregate {
task_id => "%{taskid}"
code => "map['sql_duration'] = 0"
map_action => "create"
}
}
if [logger] == "SQL" {
aggregate {
task_id => "%{taskid}"
code => "map['sql_duration'] += event.get('duration')"
map_action => "update"
}
}
if [logger] == "TASK_END" {
aggregate {
task_id => "%{taskid}"
code => "event.set('sql_duration', map['sql_duration'])"
map_action => "update"
end_of_task => true
timeout => 120
}
}
}

最后一个event输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
{
"host" => "host-10-0-251-156",
"logger" => "TASK_END",
"taskid" => "12345",
"label" => "end",
"loglevel" => "INFO",
"message" => " INFO - 12345 - TASK_END - end",
"@timestamp" => 2018-12-14T01:00:04.414Z,
"path" => "/root/logstash-6.5.0/config/test.txt",
"@version" => "1",
"sql_duration" => 46
}

sql_duration字段用于记录sql请求用时的总和。

2. 例二:输入油结束标志没有开始标志

输入日志如下

1
2
3
INFO - 12345 - SQL - sqlQuery1 - 12
INFO - 12345 - SQL - sqlQuery2 - 34
INFO - 12345 - TASK_END - end

配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ]
}
if [logger] == "SQL" {
aggregate {
task_id => "%{taskid}"
code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')"
}
}
if [logger] == "TASK_END" {
aggregate {
task_id => "%{taskid}"
code => "event.set('sql_duration', map['sql_duration'])"
end_of_task => true
timeout => 120
}
}
}
  • 输出结果跟例一一样。
  • “||= “是ruby的操作符,例子中当map[‘sql_duration’]没有舒适化值的时候,将被初始化为0。

3. 例三:没有开始和结束标志

典型的应用场景是跟踪用户行为。通过用户id跟踪用户行为事件,一旦用户停止了交互,这个事件也就相应结束了。然而却没有一个特定的标记来表示用户交互的结束。这种场景我们可以使用push_map_as_event_on_timeout参数,设置timeout,当超时发生时,将map作为一个新的事件被输出。另外我们可以设置timecode用于timeout事件结束时执行。

输入日志如下

1
2
3
INFO - 12345 - Clicked One
INFO - 12345 - Clicked Two
INFO - 12345 - Clicked Three

聚合用户的点击量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ]
}
aggregate {
task_id => "%{user_id}"
code => "map['clicks'] ||= 0; map['clicks'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 600 # 10 minutes timeout
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"
}
}

十分钟之后输出结果,最后事件的部分为:

1
2
3
4
5
6
7
8
9
10
{
"several_clicks" => true,
"@version" => "1",
"@timestamp" => 2018-12-14T05:54:09.212Z,
"clicks" => 3,
"user_id" => "12345",
"tags" => [
[0] "_aggregatetimeout"
]
}

4. 例四:没有开始和结束标志,任务连续不断

和例三一样,事件没有开始和结束的标志,并且任务连续不断。这就表明你不能靠设置timeout来更新map。

典型的例子是通过jdbc的插件获取数据进行聚合。例如有这样的sql查询语句:

1
SELECT country_name, town_name FROM town

通过jdbc插件获得3个事件:

1
2
3
{ "country_name": "France", "town_name": "Paris" }
{ "country_name": "France", "town_name": "Marseille" }
{ "country_name": "USA", "town_name": "New-York" }

我们的目的是往es里存入两个数据:

1
2
{ "country_name": "France", "towns": [ {"town_name": "Paris"}, {"town_name": "Marseille"} ] }
{ "country_name": "USA", "towns": [ {"town_name": "New-York"} ] }

可以设置push_previous_map_as_event参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
filter {
aggregate {
task_id => "%{country_name}"
code => "
map['country_name'] = event.get('country_name')
map['towns'] ||= []
map['towns'] << {'town_name' => event.get('town_name')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
}
  • 每当检测出一个新的country_name的时候,就会将当前的聚合map作为一个新的事件被推出,然后为下一个国家创建一个新的空map。
  • 当5秒超时之后,最后一个聚合map会被作为一个新的事件推出。
  • event.cancel()会将起始的没有被聚合的事件丢弃。

5. 例五:没有开始和结束标志并且尽可能快的推送事件

这个例子也没有起始和结束标志,事件以不确定的间隔时间持续输入,并且希望用户最后交互后立即将事件推出而不用等超时时间,这要求聚合事件推送到输出的时间尽可能是实时的。典型的应场景是通过id跟踪用户行为的时候,一旦用户停止了交互,这个事件也就相应结束了。然而却没有一个特定的标记来表示用户交互的结束。如果对于同一个用户,如果没有事件输入时长达到了指定的inactivity_timeout时间,则认为交互结束。

从第一个事件开始,如果同一个用户交互时间长于我们设置的timeout时间,聚合的map被推出的同时会被删除。

和例三不同的是,当用户停止交互到达设定的inactivity_timeout时长,事件就会被推出而不是要等timeout的时间。

输入日志文件

1
2
3
INFO - 12345 - Clicked One
INFO - 12345 - Clicked Two
INFO - 12345 - Clicked Three

记录用户点击数量的logstash文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
filter {
grok {
match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ]
}
aggregate {
task_id => "%{user_id}"
code => "map['clicks'] ||= 0; map['clicks'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_id_field => "user_id"
timeout => 3600 # 1 hour timeout, user activity will be considered finished one hour after the first event, even if events keep comming
inactivity_timeout => 300 # 5 minutes timeout, user activity will be considered finished if no new events arrive 5 minutes after the last event
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('several_clicks', event.get('clicks') > 1)"
}
}

当超过五分钟没有输入或者从第一个事件开始到达了一个小时(超时时间),都会提交事件

1
2
3
4
5
6
7
8
{
"user_id": "12345",
"clicks": 3,
"several_clicks": true,
"tags": [
"_aggregatetimeout"
]
}

Tips:

  • inactivity_timeout,是从某个task_id的最后一个输入开始计时,直到超过设定的时间开始执行timeout_code的内容并输出。如果有多个task_id交叉输入,则将会分别计时。比如task_id=1输入一次,然后task_id=2输入两次,然后task_id=1再输入一次。task_id=2最后一次输入在task_id=1之前,则将先输出task_id=2通过timeout_code计算后的内容。
  • timeout上一条需要发生在同一个task_id第一次输入到最后一次输入的时间不超过timeout设置的时间的情况下,如果超过了timeout设置的时间,则会立即执行timeout_code的内容。例如timeout设置5分钟,而task_id=1持续输入(输入间隔不超过inactivity_timeout设置的时间)超过五分钟,则在五分钟的时候这个事件将会被推入输出里。