Heka Decoders: различия между версиями
Sirmax (обсуждение | вклад) |
Sirmax (обсуждение | вклад) |
||
Строка 39: | Строка 39: | ||
Decoder in general are getting data from input plugins and convert it into Heka internal format. |
Decoder in general are getting data from input plugins and convert it into Heka internal format. |
||
<PRE> |
<PRE> |
||
+ | [test_decoder] |
||
− | [mm_decoder] |
||
type = "SandboxDecoder" |
type = "SandboxDecoder" |
||
− | filename = "/usr/share/lma_collector/decoders/ |
+ | filename = "/usr/share/lma_collector/decoders/test.lua" |
− | </PRE> |
||
</PRE> |
</PRE> |
||
+ | |||
+ | <syntaxhighlight lang="lua"> |
||
+ | at mm.lua |
||
+ | |||
+ | |||
+ | |||
+ | -- :Timestamp: 2016-01-28 18:39:23.473999872 +0000 UTC |
||
+ | -- :Type: heka.sandbox.metric |
||
+ | -- :Hostname: node-6 |
||
+ | -- :Pid: 0 |
||
+ | -- :Uuid: bc7d6ecd-d00c-4b1a-ad38-a593f092e0fc |
||
+ | -- :Logger: http_metrics_filter |
||
+ | -- :Payload: |
||
+ | -- :EnvVersion: |
||
+ | -- :Severity: 6 |
||
+ | -- :Fields: |
||
+ | -- | name:"type" type:string value:"gauge" |
||
+ | -- | name:"source" type:string value:"heat-api-cloudwatch" |
||
+ | -- | name:"deployment_id" type:string value:"3" |
||
+ | -- | name:"openstack_roles" type:string value:"primary-controller" |
||
+ | -- | name:"deployment_mode" type:string value:"ha_compact" |
||
+ | -- | name:"openstack_release" type:string value:"2015.1.0-7.0" |
||
+ | -- | name:"tag_fields" type:string value:["http_method","http_status"] |
||
+ | -- | name:"openstack_region" type:string value:"RegionOne" |
||
+ | -- | name:"name" type:string value:"openstack_heat_http_responses" |
||
+ | -- | name:"hostname" type:string value:"node-6" |
||
+ | -- | name:"value" type:double value:0.000433 representation:"s" |
||
+ | -- | name:"http_method" type:string value:"OPTIONS" |
||
+ | -- | name:"environment_label" type:string value:"test2" |
||
+ | -- | name:"http_status" type:string value:"300" |
||
+ | |||
+ | require "string" |
||
+ | -- require "io" |
||
+ | |||
+ | |||
+ | local msg = { |
||
+ | Type = "test.mm.decoder", |
||
+ | Payload = nil, |
||
+ | Fields = {} |
||
+ | } |
||
+ | |||
+ | |||
+ | total = 0 -- preserved between restarts since it is in global scope |
||
+ | local count = 0 -- local scope so this will not be preserved |
||
+ | |||
+ | function process_message () |
||
+ | total= total + 1 |
||
+ | count = count + 1 |
||
+ | |||
+ | -- file = io.open ("/var/log/heka-a.log", a) |
||
+ | |||
+ | |||
+ | local data = read_message("Payload") |
||
+ | msg.Fields.mmData = data |
||
+ | msg.Fields.mmCount = count |
||
+ | msg.Fields.mmTotal = total |
||
+ | inject_message(msg) |
||
+ | |||
+ | -- io.close(file) |
||
+ | |||
+ | return 0 |
||
+ | end |
||
+ | |||
+ | function timer_event(ns) |
||
+ | -- local msg = { |
||
+ | -- Type = "test.mm.decoder.timer_event", |
||
+ | -- Payload = nil, |
||
+ | -- Fields = {} |
||
+ | --} |
||
+ | -- count = 0 |
||
+ | -- inject_message(msg) |
||
+ | -- inject_payload("txt", "", |
||
+ | -- string.format("%d messages in the last minute; total=%d", count, total)) |
||
+ | end |
||
+ | |||
+ | |||
+ | -- function process_message() |
||
+ | -- local data = read_message("Payload") |
||
+ | -- msg.Fields = grammar:match(data) |
||
+ | -- |
||
+ | -- if not msg.Fields then |
||
+ | -- return -1 |
||
+ | -- end |
||
+ | -- |
||
+ | -- if payload_keep then |
||
+ | -- msg.Payload = data |
||
+ | -- end |
||
+ | -- |
||
+ | -- msg.Fields.FilePath = read_message("Fields[FilePath]") |
||
+ | -- inject_message(msg) |
||
+ | -- return 0 |
||
+ | </syntaxhighlight> |
Версия 18:05, 31 января 2016
Heka Decoders
Decoders parse the contents of the inputs to extract data from the text format and map them onto a Heka message schema. List of all available decoders: https://hekad.readthedocs.org/en/v0.10.0/config/decoders/index.html
On controller we have the follwing decoders configured:
- decoder-collectd.toml
- decoder-http-check.toml
- decoder-keystone_7_0.toml
- decoder-keystone_wsgi.toml
- decoder-mysql.toml
- decoder-notification.toml
- decoder-openstack.toml
- decoder-ovs.toml
- decoder-pacemaker.toml
- decoder-rabbitmq.toml
- decoder-swift.toml
- decoder-system.toml
All custom decoders decoders are SandboxDecoder
SandboxDecoder simple example
Sandbox documentation: https://hekad.readthedocs.org/en/v0.10.0/sandbox/index.html
Sandbox decoder is complex part of Heka, so for better understanding there it is possible to create simple input and simple decoder.
Idea is
- use the same input source as was used in collectd (read data from file "/var/log/collectd_in_data")
- use lua-based decoder to decode input data
Input
The best way to poll file is Heka. FilePollingInput. Configuration is pretty simple, this input plugin read data from file every ticker_interval.
FilePollingInput can be used to get data from /proc file system, e.g. from /proc/loadavg. One more example is application which rewrites 'stats' files.
E.g. we have the following configuration:
[test_input] type = "FilePollingInput" ticker_interval = 1 file_path = "/var/log/collectd_in_data" decoder = "test_decoder"
Decoder
Decoder in general are getting data from input plugins and convert it into Heka internal format.
[test_decoder] type = "SandboxDecoder" filename = "/usr/share/lma_collector/decoders/test.lua"
<syntaxhighlight lang="lua"> at mm.lua
-- :Timestamp: 2016-01-28 18:39:23.473999872 +0000 UTC -- :Type: heka.sandbox.metric -- :Hostname: node-6 -- :Pid: 0 -- :Uuid: bc7d6ecd-d00c-4b1a-ad38-a593f092e0fc -- :Logger: http_metrics_filter -- :Payload: -- :EnvVersion: -- :Severity: 6 -- :Fields: -- | name:"type" type:string value:"gauge" -- | name:"source" type:string value:"heat-api-cloudwatch" -- | name:"deployment_id" type:string value:"3" -- | name:"openstack_roles" type:string value:"primary-controller" -- | name:"deployment_mode" type:string value:"ha_compact" -- | name:"openstack_release" type:string value:"2015.1.0-7.0" -- | name:"tag_fields" type:string value:["http_method","http_status"] -- | name:"openstack_region" type:string value:"RegionOne" -- | name:"name" type:string value:"openstack_heat_http_responses" -- | name:"hostname" type:string value:"node-6" -- | name:"value" type:double value:0.000433 representation:"s" -- | name:"http_method" type:string value:"OPTIONS" -- | name:"environment_label" type:string value:"test2" -- | name:"http_status" type:string value:"300"
require "string" -- require "io"
local msg = {
Type = "test.mm.decoder", Payload = nil, Fields = {}
}
total = 0 -- preserved between restarts since it is in global scope
local count = 0 -- local scope so this will not be preserved
function process_message ()
total= total + 1 count = count + 1
-- file = io.open ("/var/log/heka-a.log", a)
local data = read_message("Payload") msg.Fields.mmData = data msg.Fields.mmCount = count msg.Fields.mmTotal = total inject_message(msg)
-- io.close(file)
return 0
end
function timer_event(ns) -- local msg = { -- Type = "test.mm.decoder.timer_event", -- Payload = nil, -- Fields = {} --} -- count = 0 -- inject_message(msg) -- inject_payload("txt", "", -- string.format("%d messages in the last minute; total=%d", count, total)) end
-- function process_message()
-- local data = read_message("Payload")
-- msg.Fields = grammar:match(data)
--
-- if not msg.Fields then
-- return -1
-- end
--
-- if payload_keep then
-- msg.Payload = data
-- end
--
-- msg.Fields.FilePath = read_message("Fields[FilePath]")
-- inject_message(msg)
-- return 0
</syntaxhighlight>