aggregate可以将将属于一个task的多个events的信息聚合在一起,并最终聚合到最后一个events中。使用时要注意一定要将worker数量设置成1(在logstash安装目录下的config目录下,logstash.yml文件,默认的worker数量和cpu的内核数量一致),否则处理顺序错乱会导致未知的结果。
1. 例一:输入有开始和结束标志的
输入日志如下
1 2 3 4
| INFO - 12345 - TASK_START - start INFO - 12345 - SQL - sqlQuery1 - 12 INFO - 12345 - SQL - sqlQuery2 - 34 INFO - 12345 - TASK_END - end
|
配置文件如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TASK_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] += event.get('duration')" map_action => "update" } } if [logger] == "TASK_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" map_action => "update" end_of_task => true timeout => 120 } } }
|
最后一个event输出如下:
1 2 3 4 5 6 7 8 9 10 11 12
| { "host" => "host-10-0-251-156", "logger" => "TASK_END", "taskid" => "12345", "label" => "end", "loglevel" => "INFO", "message" => " INFO - 12345 - TASK_END - end", "@timestamp" => 2018-12-14T01:00:04.414Z, "path" => "/root/logstash-6.5.0/config/test.txt", "@version" => "1", "sql_duration" => 46 }
|
sql_duration字段用于记录sql请求用时的总和。
2. 例二:输入油结束标志没有开始标志
输入日志如下
1 2 3
| INFO - 12345 - SQL - sqlQuery1 - 12 INFO - 12345 - SQL - sqlQuery2 - 34 INFO - 12345 - TASK_END - end
|
配置文件如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TASK_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } }
|
- 输出结果跟例一一样。
- “||= “是ruby的操作符,例子中当map[‘sql_duration’]没有舒适化值的时候,将被初始化为0。
3. 例三:没有开始和结束标志
典型的应用场景是跟踪用户行为。通过用户id跟踪用户行为事件,一旦用户停止了交互,这个事件也就相应结束了。然而却没有一个特定的标记来表示用户交互的结束。这种场景我们可以使用push_map_as_event_on_timeout参数,设置timeout,当超时发生时,将map作为一个新的事件被输出。另外我们可以设置timecode用于timeout事件结束时执行。
输入日志如下
1 2 3
| INFO - 12345 - Clicked One INFO - 12345 - Clicked Two INFO - 12345 - Clicked Three
|
聚合用户的点击量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ] } aggregate { task_id => "%{user_id}" code => "map['clicks'] ||= 0; map['clicks'] += 1;" push_map_as_event_on_timeout => true timeout_task_id_field => "user_id" timeout => 600 # 10 minutes timeout timeout_tags => ['_aggregatetimeout'] timeout_code => "event.set('several_clicks', event.get('clicks') > 1)" } }
|
十分钟之后输出结果,最后事件的部分为:
1 2 3 4 5 6 7 8 9 10
| { "several_clicks" => true, "@version" => "1", "@timestamp" => 2018-12-14T05:54:09.212Z, "clicks" => 3, "user_id" => "12345", "tags" => [ [0] "_aggregatetimeout" ] }
|
4. 例四:没有开始和结束标志,任务连续不断
和例三一样,事件没有开始和结束的标志,并且任务连续不断。这就表明你不能靠设置timeout来更新map。
典型的例子是通过jdbc的插件获取数据进行聚合。例如有这样的sql查询语句:
1
| SELECT country_name, town_name FROM town
|
通过jdbc插件获得3个事件:
1 2 3
| { "country_name": "France", "town_name": "Paris" } { "country_name": "France", "town_name": "Marseille" } { "country_name": "USA", "town_name": "New-York" }
|
我们的目的是往es里存入两个数据:
1 2
| { "country_name": "France", "towns": [ {"town_name": "Paris"}, {"town_name": "Marseille"} ] } { "country_name": "USA", "towns": [ {"town_name": "New-York"} ] }
|
可以设置push_previous_map_as_event参数:
1 2 3 4 5 6 7 8 9 10 11 12 13
| filter { aggregate { task_id => "%{country_name}" code => " map['country_name'] = event.get('country_name') map['towns'] ||= [] map['towns'] << {'town_name' => event.get('town_name')} event.cancel() " push_previous_map_as_event => true timeout => 5 } }
|
- 每当检测出一个新的country_name的时候,就会将当前的聚合map作为一个新的事件被推出,然后为下一个国家创建一个新的空map。
- 当5秒超时之后,最后一个聚合map会被作为一个新的事件推出。
- event.cancel()会将起始的没有被聚合的事件丢弃。
5. 例五:没有开始和结束标志并且尽可能快的推送事件
这个例子也没有起始和结束标志,事件以不确定的间隔时间持续输入,并且希望用户最后交互后立即将事件推出而不用等超时时间,这要求聚合事件推送到输出的时间尽可能是实时的。典型的应场景是通过id跟踪用户行为的时候,一旦用户停止了交互,这个事件也就相应结束了。然而却没有一个特定的标记来表示用户交互的结束。如果对于同一个用户,如果没有事件输入时长达到了指定的inactivity_timeout时间,则认为交互结束。
从第一个事件开始,如果同一个用户交互时间长于我们设置的timeout时间,聚合的map被推出的同时会被删除。
和例三不同的是,当用户停止交互到达设定的inactivity_timeout时长,事件就会被推出而不是要等timeout的时间。
输入日志文件
1 2 3
| INFO - 12345 - Clicked One INFO - 12345 - Clicked Two INFO - 12345 - Clicked Three
|
记录用户点击数量的logstash文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_id} - %{GREEDYDATA:msg_text}" ] } aggregate { task_id => "%{user_id}" code => "map['clicks'] ||= 0; map['clicks'] += 1;" push_map_as_event_on_timeout => true timeout_task_id_field => "user_id" timeout => 3600 # 1 hour timeout, user activity will be considered finished one hour after the first event, even if events keep comming inactivity_timeout => 300 # 5 minutes timeout, user activity will be considered finished if no new events arrive 5 minutes after the last event timeout_tags => ['_aggregatetimeout'] timeout_code => "event.set('several_clicks', event.get('clicks') > 1)" } }
|
当超过五分钟没有输入或者从第一个事件开始到达了一个小时(超时时间),都会提交事件
1 2 3 4 5 6 7 8
| { "user_id": "12345", "clicks": 3, "several_clicks": true, "tags": [ "_aggregatetimeout" ] }
|
Tips:
- inactivity_timeout,是从某个task_id的最后一个输入开始计时,直到超过设定的时间开始执行timeout_code的内容并输出。如果有多个task_id交叉输入,则将会分别计时。比如task_id=1输入一次,然后task_id=2输入两次,然后task_id=1再输入一次。task_id=2最后一次输入在task_id=1之前,则将先输出task_id=2通过timeout_code计算后的内容。
- timeout上一条需要发生在同一个task_id第一次输入到最后一次输入的时间不超过timeout设置的时间的情况下,如果超过了timeout设置的时间,则会立即执行timeout_code的内容。例如timeout设置5分钟,而task_id=1持续输入(输入间隔不超过inactivity_timeout设置的时间)超过五分钟,则在五分钟的时候这个事件将会被推入输出里。