Heka Decoders: различия между версиями

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску
Строка 39: Строка 39:
 
Decoder in general are getting data from input plugins and convert it into Heka internal format.
 
Decoder in general are getting data from input plugins and convert it into Heka internal format.
 
<PRE>
 
<PRE>
  +
[test_decoder]
[mm_decoder]
 
 
type = "SandboxDecoder"
 
type = "SandboxDecoder"
filename = "/usr/share/lma_collector/decoders/mm.lua"
+
filename = "/usr/share/lma_collector/decoders/test.lua"
</PRE>
 
 
</PRE>
 
</PRE>
  +
  +
<syntaxhighlight lang="lua">
  +
at mm.lua
  +
  +
  +
  +
-- :Timestamp: 2016-01-28 18:39:23.473999872 +0000 UTC
  +
-- :Type: heka.sandbox.metric
  +
-- :Hostname: node-6
  +
-- :Pid: 0
  +
-- :Uuid: bc7d6ecd-d00c-4b1a-ad38-a593f092e0fc
  +
-- :Logger: http_metrics_filter
  +
-- :Payload:
  +
-- :EnvVersion:
  +
-- :Severity: 6
  +
-- :Fields:
  +
-- | name:"type" type:string value:"gauge"
  +
-- | name:"source" type:string value:"heat-api-cloudwatch"
  +
-- | name:"deployment_id" type:string value:"3"
  +
-- | name:"openstack_roles" type:string value:"primary-controller"
  +
-- | name:"deployment_mode" type:string value:"ha_compact"
  +
-- | name:"openstack_release" type:string value:"2015.1.0-7.0"
  +
-- | name:"tag_fields" type:string value:["http_method","http_status"]
  +
-- | name:"openstack_region" type:string value:"RegionOne"
  +
-- | name:"name" type:string value:"openstack_heat_http_responses"
  +
-- | name:"hostname" type:string value:"node-6"
  +
-- | name:"value" type:double value:0.000433 representation:"s"
  +
-- | name:"http_method" type:string value:"OPTIONS"
  +
-- | name:"environment_label" type:string value:"test2"
  +
-- | name:"http_status" type:string value:"300"
  +
  +
require "string"
  +
-- require "io"
  +
  +
  +
local msg = {
  +
Type = "test.mm.decoder",
  +
Payload = nil,
  +
Fields = {}
  +
}
  +
  +
  +
total = 0 -- preserved between restarts since it is in global scope
  +
local count = 0 -- local scope so this will not be preserved
  +
  +
function process_message ()
  +
total= total + 1
  +
count = count + 1
  +
  +
-- file = io.open ("/var/log/heka-a.log", a)
  +
  +
  +
local data = read_message("Payload")
  +
msg.Fields.mmData = data
  +
msg.Fields.mmCount = count
  +
msg.Fields.mmTotal = total
  +
inject_message(msg)
  +
  +
-- io.close(file)
  +
  +
return 0
  +
end
  +
  +
function timer_event(ns)
  +
-- local msg = {
  +
-- Type = "test.mm.decoder.timer_event",
  +
-- Payload = nil,
  +
-- Fields = {}
  +
--}
  +
-- count = 0
  +
-- inject_message(msg)
  +
-- inject_payload("txt", "",
  +
-- string.format("%d messages in the last minute; total=%d", count, total))
  +
end
  +
  +
  +
-- function process_message()
  +
-- local data = read_message("Payload")
  +
-- msg.Fields = grammar:match(data)
  +
--
  +
-- if not msg.Fields then
  +
-- return -1
  +
-- end
  +
--
  +
-- if payload_keep then
  +
-- msg.Payload = data
  +
-- end
  +
--
  +
-- msg.Fields.FilePath = read_message("Fields[FilePath]")
  +
-- inject_message(msg)
  +
-- return 0
  +
</syntaxhighlight>

Версия 19:05, 31 января 2016

Heka Decoders

Decoders parse the contents of the inputs to extract data from the text format and map them onto a Heka message schema. List of all available decoders: https://hekad.readthedocs.org/en/v0.10.0/config/decoders/index.html
On controller we have the follwing decoders configured:

  • decoder-collectd.toml
  • decoder-http-check.toml
  • decoder-keystone_7_0.toml
  • decoder-keystone_wsgi.toml
  • decoder-mysql.toml
  • decoder-notification.toml
  • decoder-openstack.toml
  • decoder-ovs.toml
  • decoder-pacemaker.toml
  • decoder-rabbitmq.toml
  • decoder-swift.toml
  • decoder-system.toml

All custom decoders decoders are SandboxDecoder

SandboxDecoder simple example

Sandbox documentation: https://hekad.readthedocs.org/en/v0.10.0/sandbox/index.html
Sandbox decoder is complex part of Heka, so for better understanding there it is possible to create simple input and simple decoder. Idea is

  • use the same input source as was used in collectd (read data from file "/var/log/collectd_in_data")
  • use lua-based decoder to decode input data

Input

The best way to poll file is Heka. FilePollingInput. Configuration is pretty simple, this input plugin read data from file every ticker_interval. FilePollingInput can be used to get data from /proc file system, e.g. from /proc/loadavg. One more example is application which rewrites 'stats' files.
E.g. we have the following configuration:

[test_input]
type = "FilePollingInput"
ticker_interval = 1
file_path = "/var/log/collectd_in_data"
decoder = "test_decoder"

Decoder

Decoder in general are getting data from input plugins and convert it into Heka internal format.

[test_decoder]
type = "SandboxDecoder"
filename = "/usr/share/lma_collector/decoders/test.lua"

<syntaxhighlight lang="lua"> at mm.lua


-- :Timestamp: 2016-01-28 18:39:23.473999872 +0000 UTC -- :Type: heka.sandbox.metric -- :Hostname: node-6 -- :Pid: 0 -- :Uuid: bc7d6ecd-d00c-4b1a-ad38-a593f092e0fc -- :Logger: http_metrics_filter -- :Payload: -- :EnvVersion: -- :Severity: 6 -- :Fields: -- | name:"type" type:string value:"gauge" -- | name:"source" type:string value:"heat-api-cloudwatch" -- | name:"deployment_id" type:string value:"3" -- | name:"openstack_roles" type:string value:"primary-controller" -- | name:"deployment_mode" type:string value:"ha_compact" -- | name:"openstack_release" type:string value:"2015.1.0-7.0" -- | name:"tag_fields" type:string value:["http_method","http_status"] -- | name:"openstack_region" type:string value:"RegionOne" -- | name:"name" type:string value:"openstack_heat_http_responses" -- | name:"hostname" type:string value:"node-6" -- | name:"value" type:double value:0.000433 representation:"s" -- | name:"http_method" type:string value:"OPTIONS" -- | name:"environment_label" type:string value:"test2" -- | name:"http_status" type:string value:"300"

require "string" -- require "io"


local msg = {

   Type = "test.mm.decoder",
   Payload = nil,
   Fields = {}

}


total = 0 -- preserved between restarts since it is in global scope local count = 0 -- local scope so this will not be preserved

function process_message ()

   total= total + 1
   count = count + 1

-- file = io.open ("/var/log/heka-a.log", a)


   local data = read_message("Payload")
   msg.Fields.mmData  = data
   msg.Fields.mmCount = count
   msg.Fields.mmTotal = total
   inject_message(msg)

-- io.close(file)

   return 0

end

function timer_event(ns) -- local msg = { -- Type = "test.mm.decoder.timer_event", -- Payload = nil, -- Fields = {} --} -- count = 0 -- inject_message(msg) -- inject_payload("txt", "", -- string.format("%d messages in the last minute; total=%d", count, total)) end


-- function process_message() -- local data = read_message("Payload") -- msg.Fields = grammar:match(data) -- -- if not msg.Fields then -- return -1 -- end -- -- if payload_keep then -- msg.Payload = data -- end -- -- msg.Fields.FilePath = read_message("Fields[FilePath]") -- inject_message(msg) -- return 0 </syntaxhighlight>