Heka Decoders
Heka Decoders
Decoders parse the contents of the inputs to extract data from the text format and map them onto a Heka message schema. List of all available decoders: https://hekad.readthedocs.org/en/v0.10.0/config/decoders/index.html
On controller we have the follwing decoders configured:
- decoder-collectd.toml
- decoder-http-check.toml
- decoder-keystone_7_0.toml
- decoder-keystone_wsgi.toml
- decoder-mysql.toml
- decoder-notification.toml
- decoder-openstack.toml
- decoder-ovs.toml
- decoder-pacemaker.toml
- decoder-rabbitmq.toml
- decoder-swift.toml
- decoder-system.toml
All custom decoders decoders are SandboxDecoder
SandboxDecoder simple example
Sandbox documentation: https://hekad.readthedocs.org/en/v0.10.0/sandbox/index.html
Sandbox decoder is complex part of Heka, so for better understanding there it is possible to create simple input and simple decoder.
Idea is
- use the same input source as was used in collectd (read data from file "/var/log/collectd_in_data")
- use lua-based decoder to decode input data
Input
The best way to poll file is Heka. FilePollingInput. Configuration is pretty simple, this input plugin read data from file every ticker_interval.
FilePollingInput can be used to get data from /proc file system, e.g. from /proc/loadavg. One more example is application which rewrites 'stats' files.
E.g. we have the following configuration:
[test_input] type = "FilePollingInput" ticker_interval = 1 file_path = "/var/log/collectd_in_data" decoder = "test_decoder"
Decoder
Decoder in general are getting data from input plugins and convert it into Heka internal format.
[test_decoder] type = "SandboxDecoder" filename = "/usr/share/lma_collector/decoders/test.lua"
<syntaxhighlight lang="lua"> at mm.lua
-- :Timestamp: 2016-01-28 18:39:23.473999872 +0000 UTC -- :Type: heka.sandbox.metric -- :Hostname: node-6 -- :Pid: 0 -- :Uuid: bc7d6ecd-d00c-4b1a-ad38-a593f092e0fc -- :Logger: http_metrics_filter -- :Payload: -- :EnvVersion: -- :Severity: 6 -- :Fields: -- | name:"type" type:string value:"gauge" -- | name:"source" type:string value:"heat-api-cloudwatch" -- | name:"deployment_id" type:string value:"3" -- | name:"openstack_roles" type:string value:"primary-controller" -- | name:"deployment_mode" type:string value:"ha_compact" -- | name:"openstack_release" type:string value:"2015.1.0-7.0" -- | name:"tag_fields" type:string value:["http_method","http_status"] -- | name:"openstack_region" type:string value:"RegionOne" -- | name:"name" type:string value:"openstack_heat_http_responses" -- | name:"hostname" type:string value:"node-6" -- | name:"value" type:double value:0.000433 representation:"s" -- | name:"http_method" type:string value:"OPTIONS" -- | name:"environment_label" type:string value:"test2" -- | name:"http_status" type:string value:"300"
require "string" -- require "io"
local msg = {
Type = "test.mm.decoder", Payload = nil, Fields = {}
}
total = 0 -- preserved between restarts since it is in global scope
local count = 0 -- local scope so this will not be preserved
function process_message ()
total= total + 1 count = count + 1
-- file = io.open ("/var/log/heka-a.log", a)
local data = read_message("Payload") msg.Fields.mmData = data msg.Fields.mmCount = count msg.Fields.mmTotal = total inject_message(msg)
-- io.close(file)
return 0
end
function timer_event(ns) -- local msg = { -- Type = "test.mm.decoder.timer_event", -- Payload = nil, -- Fields = {} --} -- count = 0 -- inject_message(msg) -- inject_payload("txt", "", -- string.format("%d messages in the last minute; total=%d", count, total)) end
-- function process_message()
-- local data = read_message("Payload")
-- msg.Fields = grammar:match(data)
--
-- if not msg.Fields then
-- return -1
-- end
--
-- if payload_keep then
-- msg.Payload = data
-- end
--
-- msg.Fields.FilePath = read_message("Fields[FilePath]")
-- inject_message(msg)
-- return 0
</syntaxhighlight>