Heka Decoders

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску

Heka Decoders

Decoders parse the contents of the inputs to extract data from the text format and map them onto a Heka message schema. List of all available decoders: https://hekad.readthedocs.org/en/v0.10.0/config/decoders/index.html
On controller we have the follwing decoders configured:

  • decoder-collectd.toml
  • decoder-http-check.toml
  • decoder-keystone_7_0.toml
  • decoder-keystone_wsgi.toml
  • decoder-mysql.toml
  • decoder-notification.toml
  • decoder-openstack.toml
  • decoder-ovs.toml
  • decoder-pacemaker.toml
  • decoder-rabbitmq.toml
  • decoder-swift.toml
  • decoder-system.toml

All custom decoders decoders are SandboxDecoder

SandboxDecoder simple example

Sandbox documentation: https://hekad.readthedocs.org/en/v0.10.0/sandbox/index.html
Sandbox decoder is complex part of Heka, so for better understanding there it is possible to create simple input and simple decoder. Idea is

  • use the same input source as was used in collectd (read data from file "/var/log/collectd_in_data")
  • use lua-based decoder to decode input data

Input

The best way to poll file is Heka. FilePollingInput. Configuration is pretty simple, this input plugin read data from file every ticker_interval. FilePollingInput can be used to get data from /proc file system, e.g. from /proc/loadavg. One more example is application which rewrites 'stats' files.
E.g. we have the following configuration:

[test_input]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/var/log/collectd_in_data"
decoder = "test_decoder"

Decoder

Decoder in general are getting data from input plugins and convert it into Heka internal format.

[test_decoder]
type = "SandboxDecoder"
filename = "/usr/share/lma_collector/decoders/test.lua"

<syntaxhighlight lang="lua"> at mm.lua


-- :Timestamp: 2016-01-28 18:39:23.473999872 +0000 UTC -- :Type: heka.sandbox.metric -- :Hostname: node-6 -- :Pid: 0 -- :Uuid: bc7d6ecd-d00c-4b1a-ad38-a593f092e0fc -- :Logger: http_metrics_filter -- :Payload: -- :EnvVersion: -- :Severity: 6 -- :Fields: -- | name:"type" type:string value:"gauge" -- | name:"source" type:string value:"heat-api-cloudwatch" -- | name:"deployment_id" type:string value:"3" -- | name:"openstack_roles" type:string value:"primary-controller" -- | name:"deployment_mode" type:string value:"ha_compact" -- | name:"openstack_release" type:string value:"2015.1.0-7.0" -- | name:"tag_fields" type:string value:["http_method","http_status"] -- | name:"openstack_region" type:string value:"RegionOne" -- | name:"name" type:string value:"openstack_heat_http_responses" -- | name:"hostname" type:string value:"node-6" -- | name:"value" type:double value:0.000433 representation:"s" -- | name:"http_method" type:string value:"OPTIONS" -- | name:"environment_label" type:string value:"test2" -- | name:"http_status" type:string value:"300"

require "string" -- require "io"


local msg = {

   Type = "test.mm.decoder",
   Payload = nil,
   Fields = {}

}


total = 0 -- preserved between restarts since it is in global scope local count = 0 -- local scope so this will not be preserved

function process_message ()

   total= total + 1
   count = count + 1

-- file = io.open ("/var/log/heka-a.log", a)


   local data = read_message("Payload")
   msg.Fields.mmData  = data
   msg.Fields.mmCount = count
   msg.Fields.mmTotal = total
   inject_message(msg)

-- io.close(file)

   return 0

end

function timer_event(ns) -- local msg = { -- Type = "test.mm.decoder.timer_event", -- Payload = nil, -- Fields = {} --} -- count = 0 -- inject_message(msg) -- inject_payload("txt", "", -- string.format("%d messages in the last minute; total=%d", count, total)) end


-- function process_message() -- local data = read_message("Payload") -- msg.Fields = grammar:match(data) -- -- if not msg.Fields then -- return -1 -- end -- -- if payload_keep then -- msg.Payload = data -- end -- -- msg.Fields.FilePath = read_message("Fields[FilePath]") -- inject_message(msg) -- return 0 </syntaxhighlight>