Logstash: различия между версиями

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску
 
(не показано 46 промежуточных версий этого же участника)
Строка 3: Строка 3:
 
[[Категория:Logstash]]
 
[[Категория:Logstash]]
 
[[Категория:Grok]]
 
[[Категория:Grok]]
  +
[[Категория:Elasticsearch]]
  +
[[Категория:Kibana]]
  +
[[Категория:Filebeat]]
 
=Logstash=
 
=Logstash=
 
Заметки по Logstash с примерами конфигов
 
Заметки по Logstash с примерами конфигов
   
  +
<B>Заметка возникла по-тому что каждый раз когда надо настроить Logstash я обнаруживал что забыл почти все - и решил записать что б в следующий раз зачитать по бумажке</B>
  +
==Установка==
  +
* https://www.elastic.co/guide/en/logstash/current/index.html
  +
<PRE>
  +
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  +
sudo apt-get install apt-transport-https
  +
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
  +
sudo apt-get update
  +
sudo apt-get install elasticsearch
  +
</PRE>
  +
  +
Порядок работы - input --> filter (в том числе изменение) -->output
  +
<BR>
  +
Для каждой фазы есть множество плагинов
 
<PRE>
 
<PRE>
 
input {
 
input {
Строка 25: Строка 42:
 
Данный метод является входной точкой для логов
 
Данный метод является входной точкой для логов
   
  +
Тыщи их если что
===File===
 
  +
* https://www.elastic.co/guide/en/logstash/current/input-plugins.html
  +
  +
В описании только самые простые/ходовые
  +
  +
==Beats==
  +
Точка для получения логов от Filebeat
  +
* https://www.elastic.co/guide/en/beats/filebeat/current/configuring-howto-filebeat.html
  +
* https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats
  +
<PRE>
  +
beats {
  +
port => 5400
  +
ssl => true
  +
ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
  +
ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
  +
ssl_key => "/etc/elk-certs/elk-ssl.key"
  +
ssl_verify_mode => "force_peer"
  +
}
  +
</PRE>
  +
Сертефикаты должны быть подписаны одним CA - по сути тут требуется авторизация с помошью сертефикатов (как например OpenVPN или такое можно делать с nginx)
  +
<BR>
  +
Важнейшая настройка Ж
  +
<PRE>
  +
ssl_verify_modeedit
  +
</PRE>
  +
Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification. <BR>
  +
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated. <BR>
  +
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed. <BR>
  +
<BR>
  +
This option needs to be used with ssl_certificate_authorities and a defined list of CAs.
  +
<BR>
  +
  +
===Генерация серетфикатов===
  +
Простой скрипт для самоподписанных сертефикатов (хотя конечно лучше организовать свой PKI)
  +
<PRE>
  +
#!/bin/bash
  +
  +
CN="logstash.tld"
  +
sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt
  +
  +
chown logstash elk-ssl.crt
  +
chown logstash elk-ssl.key
  +
</PRE>
  +
<B>Важно:</B> В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)
  +
  +
==File==
 
</PRE>
 
</PRE>
 
input {
 
input {
 
file {
 
file {
type => "some_access_log"
 
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
Строка 37: Строка 98:
 
}
 
}
 
}
 
}
</PRE>
 
<PRE>
 
type => "some_access_log"
 
тип/описание лога. При использовании нескольких входных блоков, удобно их разделять для последующих действий в filter или output.
 
 
</PRE>
 
</PRE>
 
<PRE>
 
<PRE>
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
</PRE>
 
</PRE>
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным
+
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (?)
 
<PRE>
 
<PRE>
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
Строка 63: Строка 120:
 
</PRE>
 
</PRE>
   
===tcp===
+
==tcp==
 
Пример конфигурации, для работы с логами удалённых сервисов:
 
Пример конфигурации, для работы с логами удалённых сервисов:
 
<PRE>
 
<PRE>
 
input {
 
input {
 
tcp {
 
tcp {
type => "webserver_prod"
 
 
data_timeout => 10
 
data_timeout => 10
 
mode => "server"
 
mode => "server"
Строка 78: Строка 134:
   
 
Построчное описание настроек:
 
Построчное описание настроек:
<PRE>
 
type => "webserver_prod"
 
тип/описание лога.
 
</PRE>
 
 
<PRE>
 
<PRE>
 
data_timeout => 10
 
data_timeout => 10
<PRE>
+
</PRE>
 
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.
 
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.
 
<PRE>
 
<PRE>
Строка 93: Строка 145:
 
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
 
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
   
===udp===
+
==udp==
 
<PRE>
 
<PRE>
 
Для udp настройки аналогичные tcp:
 
Для udp настройки аналогичные tcp:
 
input {
 
input {
 
udp {
 
udp {
type => "webserver_prod"
 
 
buffer_size => 4096
 
buffer_size => 4096
 
host => "192.168.3.12"
 
host => "192.168.3.12"
Строка 105: Строка 156:
 
}
 
}
 
</PRE>
 
</PRE>
  +
 
==FILTER==
 
==FILTER==
  +
Все плагины - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
В данном блоке настраиваются основные манипуляции с логами. Это может быть и разбивка по key=value, и удаление ненужных параметров, и замена имеющихся значений, и использование geoip или DNS запросов для ип-адресов или названий хостов.
 
  +
<BR>
  +
В данном блоке настраиваются основные манипуляции с логами. <BR>
  +
Это может быть например
  +
* разбивка по key=value
  +
* удаление ненужных параметров
  +
* замена имеющихся значений
  +
* использование geoip или DNS запросов для ип-адресов или названий хостов.
  +
   
На первый взгляд применение фильтров может показаться сложным и нелогичным, но это не совсем так.
 
 
===grok===
 
===grok===
  +
GROK - разбивает строку с помошью регулярных выражений.
  +
<BR>
 
Пример конфигурационного файла для основной нормализации логов:
 
Пример конфигурационного файла для основной нормализации логов:
 
<PRE>
 
<PRE>
 
filter {
 
filter {
 
grok {
 
grok {
type => "some_access_log"
 
 
patterns_dir => "/path/to/patterns/"
 
patterns_dir => "/path/to/patterns/"
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
Строка 121: Строка 181:
 
</PRE>
 
</PRE>
   
Построчное описание настроек:
+
Описание настроек:
<PRE>
 
type => "apache_access"
 
</PRE>
 
тип/описание лога. Здесь надо указать тот тип (type), который прописан в блоке input для которого будет происходить обработка.
 
 
<PRE>
 
<PRE>
 
patterns_dir => "/path/to/patterns/"
 
patterns_dir => "/path/to/patterns/"
 
</PRE>
 
</PRE>
путь к каталогу, содержащим шаблоны обработки логов. Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
+
путь к каталогу, содержащим шаблоны обработки логов.<BR>
  +
Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
  +
<BR>
 
<PRE>
 
<PRE>
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
 
</PRE>
 
</PRE>
  +
указывается шаблон разборки логов.
указывается шаблон разборки логов. Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. Что бы не путаться, я для каждого формата логов создаю отдельный шаблонный файл.
 
  +
<BR>
  +
Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов.
  +
<BR>
  +
Создавать свои шаблоны необязательно - достаточно встроенных
  +
* https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/ecs-v1/grok-patterns
   
==Подробнее про шаблоны==
+
===Подробнее про шаблоны===
 
С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате.
 
С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате.
 
Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.
 
Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.
   
 
Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:
 
Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:
  +
<PRE>
 
NUMBER \d+
 
NUMBER \d+
 
WORD \b\w+\b
 
WORD \b\w+\b
 
USERID [a-zA-Z0-9_-]+
 
USERID [a-zA-Z0-9_-]+
  +
</PRE>
   
 
Можно использовать любой ранее созданный шаблон:
 
Можно использовать любой ранее созданный шаблон:
  +
<PRE>
 
USER %{USERID}
 
USER %{USERID}
  +
</PRE>
 
 
Шаблоны можно так же и комбинировать:
 
Шаблоны можно так же и комбинировать:
  +
<PRE>
 
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
 
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
 
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
 
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
 
MAC (?:%{CISCOMAC}|%{WINDOWSMAC})
 
MAC (?:%{CISCOMAC}|%{WINDOWSMAC})
  +
</PRE>
   
  +
Допустим формат логов у нас следующий (пример шаблона выше):
 
  +
<PRE>
Допустим формат логов у нас следующий:
 
 
55.3.244.1 GET /index.html 15824 0.043
 
55.3.244.1 GET /index.html 15824 0.043
  +
</PRE>
   
  +
Среди готовых шаблонов, к счастью уже имеются некоторые регулярные выражения и не надо придумывать колёсное транспортное средство, приводимое в движение мускульной силой человека через ножные педали или через ручные рычаги (это я про велосипед если что).
 
С данным примером лога достаточно pattern записать в виде "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}", в этом случае все логи с данным форматом будут уже иметь некую логическую структуру.
+
С данным примером лога достаточно pattern записать в виде
  +
<PRE>
  +
"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
  +
</PRE>
 
После обработки наша строка будет выглядеть следующим образом:
 
После обработки наша строка будет выглядеть следующим образом:
  +
<PRE>
 
client: 55.3.244.1
 
client: 55.3.244.1
 
method: GET
 
method: GET
Строка 164: Строка 237:
 
bytes: 15824
 
bytes: 15824
 
duration: 0.043
 
duration: 0.043
  +
</PRE>
   
  +
===mutate===
 
Со списком готовых grok-шаблонов можно познакомиться здесь.
 
 
 
 
==mutate==
 
 
Пример конфигурационного файла для изменения/удаления записей из логов:
 
Пример конфигурационного файла для изменения/удаления записей из логов:
  +
* https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
 
filter {
 
filter {
 
mutate {
 
mutate {
type => "apache_access"
+
remove_field => [ "client" ]
  +
replace => { "message" => "%{source_host}: My new message" }
remove => [ "client" ]
 
  +
 
rename => [ "HOSTORIP", "client_ip" ]
 
rename => [ "HOSTORIP", "client_ip" ]
 
gsub => [ "message", "\\/", "_" ]
 
gsub => [ "message", "\\/", "_" ]
Строка 181: Строка 252:
 
}
 
}
 
}
 
}
  +
<PRE>
 
Построчное описание настроек:
 
type => "apache_access"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
 
remove => [ "client" ]
 
remove => [ "client" ]
  +
</PRE>
удаление всех данных имеющих название поля client. Возможно указание нескольких названий полей.
 
  +
<PRE>
 
 
rename => [ "HOSTORIP", "client_ip" ]
 
rename => [ "HOSTORIP", "client_ip" ]
  +
</PRE>
 
переименование название поля HOSTORIP в client_ip.
 
переименование название поля HOSTORIP в client_ip.
   
  +
<PRE>
gsub => [ "message", "\\/", "_" ]
 
замена всех "/" на "_" в поле messages.
 
 
 
add_field => [ "sample1", "from %{clientip}" ]
 
add_field => [ "sample1", "from %{clientip}" ]
добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных.
+
добавление нового поля «sample1» со значением «from %{clientip}».
  +
Допускается использование названий переменных или статических значений.
 
  +
</PRE>
2.3 date
 
  +
===date===
 
Пример конфигурационого файла:
 
Пример конфигурационого файла:
  +
</PRE>
 
filter {
 
filter {
 
date {
 
date {
type => "apache_access"
 
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
 
}
 
}
 
}
 
}
  +
</PRE>
 
  +
<PRE>
Построчное описание настроек:
 
type => "apache_access"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
  +
</PRE>
временная метка события. Важная настройка для дальнейшей возможности сортировки или выборки логов. Если в логах время указано в unix timestamp (squid), то следует использовать match => [ «timestamp», «UNIX» ]
 
  +
временная метка события.
  +
<BR>
  +
Если в логах время указано в unix timestamp, то следует использовать match => [ «timestamp», «UNIX» ]
   
 
===kv===
 
===kv===
 
Пример конфигурационного файла для обработки логов в формате key=value:
 
Пример конфигурационного файла для обработки логов в формате key=value:
  +
<PRE>
 
filter {
 
filter {
 
kv {
 
kv {
type => "custom_log"
 
 
value_split => "=:"
 
value_split => "=:"
 
fields => ["reminder"]
 
fields => ["reminder"]
Строка 224: Строка 291:
 
}
 
}
 
}
 
}
  +
</PRE>
 
  +
<PRE>
Построчное описание настроек:
 
type => "custom_log"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
 
value_split => "=:"
 
value_split => "=:"
  +
</PRE>
 
использовать символы "=" и ":" для разделения ключа-значения.
 
использовать символы "=" и ":" для разделения ключа-значения.
  +
<PRE>
 
 
fields => ["reminder"]
 
fields => ["reminder"]
  +
</PRE>
 
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
 
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
  +
<PRE>
 
 
field_split => "\t?&"
 
field_split => "\t?&"
  +
</PRE>
 
использовать символы "\t?&" для разделения ключей. \t — знак табулятора
 
использовать символы "\t?&" для разделения ключей. \t — знак табулятора
   
  +
==OUTPUT==
  +
* https://www.elastic.co/guide/en/logstash/current/output-plugins.html
   
  +
===stdout===
 
 
 
 
 
 
 
 
===multiline===
 
Пример конфигурационного файла для «склеивания» многострочных логов (например Java stack trace):
 
filter {
 
multiline {
 
type => "java_log"
 
pattern => "^\s"
 
what => "previous"
 
}
 
}
 
 
Построчное описание настроек:
 
type => "java_log"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
pattern => "^\s"
 
регулярное выражение
 
 
what => "previous"
 
при соответствии шаблону «pattern» строка принадлежит предыдущей (previous) строке.
 
 
 
 
==OUTPUT==
 
Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков.
 
3. OUTPUT
 
Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков.
 
3.1 stdout
 
 
Пример конфигурационного файла для вывода логов в standard output:
 
Пример конфигурационного файла для вывода логов в standard output:
  +
<PRE>
 
output {
 
output {
 
stdout {
 
stdout {
type => "custom_log"
 
 
message => "IP - %{clientip}. Full message: %{@message}. End of line."
 
message => "IP - %{clientip}. Full message: %{@message}. End of line."
 
}
 
}
 
}
 
}
  +
</PRE>
   
  +
<PRE>
 
type => "custom_log"
 
тип/описание лога.
 
 
 
message => "clIP - %{clientip}. Full message: %{@message}. End of line."
 
message => "clIP - %{clientip}. Full message: %{@message}. End of line."
  +
</PRE>
 
указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.
 
указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.
   
3.2 file
+
===file===
 
Пример конфигурационого файла для записи логов в файл:
 
Пример конфигурационого файла для записи логов в файл:
  +
<PRE>
 
output {
 
output {
 
file {
 
file {
type => "custom_log"
 
 
flush_interval => 5
 
flush_interval => 5
 
gzip=> true
 
gzip=> true
path => "/var/log/custom/%{clientip}/%{type}"
+
path => "/var/log/custom/%{clientip}"
message_format => "ip: %{clientip} request:%{requri}"
 
 
}
 
}
 
}
 
}
  +
</PRE>
 
  +
<PRE>
 
type => "custom_log"
 
тип/описание лога.
 
 
 
flush_interval => 5
 
flush_interval => 5
  +
</PRE>
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение.
 
  +
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение (что конечно создаст дополнительную нагрузку на диск).
 
  +
<PRE>
 
gzip=> true
 
gzip=> true
  +
</PRE>
 
файл исходящих сообщений будет сжат Gzip.
 
файл исходящих сообщений будет сжат Gzip.
  +
<PRE>
  +
path => "/var/log/custom/%{clientip}"
  +
</PRE>
  +
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные.
  +
В данном примере, для каждого уникального IP адреса будет создана своя папка.
   
  +
<B>Формат как будет записано определяет кодек - в общем случае это JSON</B> Так как кодек всех устраивает в большей части примеров он никак не указан
path => "/var/log/custom/%{clientip}/%{type}"
 
  +
<PRE>
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка и сообщения будут записываться в файл соответствующий переменной %{type}.
 
  +
codec
 
  +
</PRE>
message_format => "ip: %{clientip} request:%{requri}"
 
  +
Value type is codec
формат исходящего сообщения.
 
  +
Default value is "json_lines"
   
3.3 elasticsearch
+
===elasticsearch===
 
Пример конфигурационного файла для записи логов в базу Elasticsearch:
 
Пример конфигурационного файла для записи логов в базу Elasticsearch:
  +
<PRE>
 
output {
 
output {
 
elasticsearch {
 
elasticsearch {
type => "custom_log"
 
 
cluster => "es_logs"
 
cluster => "es_logs"
embedded => false
 
 
host => "192.168.1.1"
 
host => "192.168.1.1"
 
port => "19300"
 
port => "19300"
Строка 329: Строка 366:
 
}
 
}
 
}
 
}
  +
</PRE>
 
  +
<PRE>
 
type => "custom_log"
 
тип/описание лога.
 
 
 
cluster => "es_logs"
 
cluster => "es_logs"
  +
</PRE>
название кластера указанного в cluster.name в настроечном файле Elasticsearch.
 
  +
<PRE>
 
embedded => false
 
указывает какую базу Elasticsearch использовать внутреннюю или стороннюю.
 
 
 
port => "19300"
 
port => "19300"
  +
</PRE>
 
транспортный port Elasticsearch.
 
транспортный port Elasticsearch.
  +
<PRE>
 
 
host => "192.168.1.1"
 
host => "192.168.1.1"
  +
</PRE>
 
IP адрес Elasticsearch
 
IP адрес Elasticsearch
  +
<PRE>
 
 
index => "logs-%{+YYYY.MM.dd}"
 
index => "logs-%{+YYYY.MM.dd}"
  +
</PRE>
 
название индекса куда будут записываться логи.
 
название индекса куда будут записываться логи.
   
  +
=Пример=
3.4 email
 
  +
Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1
Данный плагин можно использовать для алертов. Минус в том, что любые изменения уведомлений (в принципе как и любых других настроек) требуют перезапуск logstash программы, но разработчик говорит, что возможно в будущем этого не придётся делать.
 
Пример конфигурационого файла:
 
output {
 
email {
 
type => "custom_log"
 
from => "logstash@domain.com"
 
to => "admin1@domain.com"
 
cc => "admin2@domain.com"
 
subject => "Found '%{matchName}' Alert on %{@source_host}"
 
body => "Here is the event line %{@message}"
 
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
 
via => "sendmail"
 
options => [ "smtpIporHost", "smtp.gmail.com",
 
"port", "587",
 
"domain", "yourDomain",
 
"userName", "yourSMTPUsername",
 
"password", "PASS",
 
"starttls", "true",
 
"authenticationType", "plain",
 
"debug", "true"
 
]
 
match => [ "response errors", "response,501,,or,response,301",
 
"multiple response errors", "response,501,,and,response,301" ]
 
 
}
 
}
 
 
 
type => "custom_log"
 
тип/описание лога.
 
 
from => "logstash@domain.com"
 
to => "admin1@domain.com"
 
cc => "admin2@domain.com"
 
если у вас хватило сил дочитать до этих строк, значит вы можете сами определить смысл этих 3х настроек :)
 
 
subject => "Found '%{matchName}' Alert on %{@source_host}"
 
тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match».
 
 
body => "Here is the event line %{@message}"
 
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
 
тело письма.
 
 
via => "sendmail"
 
способ отсылки письма. Возможен один вариант из двух — smtp или sendmail.
 
 
options => ...
 
стандартные настройки почтовых параметров.
 
 
match => [ "response errors", "response,501,,or,response,301",
 
"multiple response errors", "response,501,,and,response,301" ]
 
«response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены.
 
 
Создаём файл habr.conf:
 
input {
 
tcp {
 
type => "habr"
 
port => "11111"
 
}
 
}
 
filter {
 
mutate {
 
type => "habr"
 
add_field => [ "habra_field", "Hello Habr" ]
 
}
 
}
 
output {
 
stdout {
 
type => "habr"
 
message => "%{habra_field}: %{@message}"
 
}
 
}
 
 
 
Запускаем Logstash:
 
java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf
 
 
Проверяем, что Logstash запущен:
 
# netstat -nat |grep 11111
 
Если порт 11111 присутствует, значит Logstash готов принимать логи.
 
 
В новом терминальном окне пишем:
 
echo "Logs are cool!" | nc localhost 11111
 
 
Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает.
 
 
   
  +
=Альтернативы=
  +
* Vector:
  +
** https://habr.com/ru/post/514480/
 
=Ссылки=
 
=Ссылки=
 
* https://stackoverflow.com/questions/57599095/parse-json-array-string-using-logstash
 
* https://stackoverflow.com/questions/57599095/parse-json-array-string-using-logstash

Текущая версия на 16:27, 23 августа 2021

Logstash

Заметки по Logstash с примерами конфигов

Заметка возникла по-тому что каждый раз когда надо настроить Logstash я обнаруживал что забыл почти все - и решил записать что б в следующий раз зачитать по бумажке

Установка

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
sudo apt-get update
sudo apt-get install elasticsearch

Порядок работы - input --> filter (в том числе изменение) -->output
Для каждой фазы есть множество плагинов

input {
  ...
}
filter {
  ...
}
output {
  ...
}

INPUT

Данный метод является входной точкой для логов

Тыщи их если что

В описании только самые простые/ходовые

Beats

Точка для получения логов от Filebeat

    beats {
        port => 5400
        ssl => true
        ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
        ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
        ssl_key => "/etc/elk-certs/elk-ssl.key"
        ssl_verify_mode => "force_peer"
    }

Сертефикаты должны быть подписаны одним CA - по сути тут требуется авторизация с помошью сертефикатов (как например OpenVPN или такое можно делать с nginx)
Важнейшая настройка Ж

ssl_verify_modeedit

Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification.
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.

This option needs to be used with ssl_certificate_authorities and a defined list of CAs.

Генерация серетфикатов

Простой скрипт для самоподписанных сертефикатов (хотя конечно лучше организовать свой PKI)

#!/bin/bash

CN="logstash.tld"
sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt

chown logstash elk-ssl.crt
chown logstash elk-ssl.key

Важно: В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)

File

input {

 file {
   path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
   exclude => [ "*.gz", "*.zip", "*.rar" ]
   start_position => "end"
   stat_interval => 1
   discover_interval => 30
 }

}

path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]

указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (?)

exclude => [ "*.gz", "*.zip", "*.rar" ]
исключает из обработки файлы с соответствующими расширениями.
start_position => "end"
ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1
как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30
время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.

tcp

Пример конфигурации, для работы с логами удалённых сервисов:

input {
  tcp {
    data_timeout => 10
    mode => "server"
    host => "192.168.3.12"
    port => 3337
  }
}

Построчное описание настроек:

data_timeout => 10

время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.

mode => "server"
host => "192.168.3.12"
port => 3337

в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.

udp

Для udp настройки аналогичные tcp:
input {
  udp {
    buffer_size => 4096
    host => "192.168.3.12"
    port => 3337
  }
}

FILTER

Все плагины - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
В данном блоке настраиваются основные манипуляции с логами.
Это может быть например

  • разбивка по key=value
  • удаление ненужных параметров
  • замена имеющихся значений
  • использование geoip или DNS запросов для ип-адресов или названий хостов.


grok

GROK - разбивает строку с помошью регулярных выражений.
Пример конфигурационного файла для основной нормализации логов:

filter {
  grok {
    patterns_dir => "/path/to/patterns/"
    pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
  }
}

Описание настроек:

patterns_dir => "/path/to/patterns/"

путь к каталогу, содержащим шаблоны обработки логов.
Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.

pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

указывается шаблон разборки логов.
Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов.
Создавать свои шаблоны необязательно - достаточно встроенных

Подробнее про шаблоны

С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате. Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.

Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:

NUMBER \d+
WORD \b\w+\b
USERID [a-zA-Z0-9_-]+

Можно использовать любой ранее созданный шаблон:

USER %{USERID}

Шаблоны можно так же и комбинировать:

CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
MAC (?:%{CISCOMAC}|%{WINDOWSMAC})

Допустим формат логов у нас следующий (пример шаблона выше):

55.3.244.1 GET /index.html 15824 0.043


С данным примером лога достаточно pattern записать в виде

"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

После обработки наша строка будет выглядеть следующим образом:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

mutate

Пример конфигурационного файла для изменения/удаления записей из логов:

filter {

 mutate {
   remove_field => [ "client" ]
   replace => { "message" => "%{source_host}: My new message" }
   rename => [ "HOSTORIP", "client_ip" ]
   gsub => [ "message", "\\/", "_" ]
   add_field => [ "sample1", "from %{clientip}" ]
 }

}

remove => [ "client" ]
rename => [ "HOSTORIP", "client_ip" ]

переименование название поля HOSTORIP в client_ip.

add_field => [ "sample1", "from %{clientip}" ]
добавление нового поля «sample1» со значением «from %{clientip}». 
Допускается использование названий переменных или статических значений.

date

Пример конфигурационого файла:

filter {

 date {
   match => [ "timestamp", "MMM dd HH:mm:ss" ]
 }

}

match => [ "timestamp", "MMM dd HH:mm:ss" ]

временная метка события.
Если в логах время указано в unix timestamp, то следует использовать match => [ «timestamp», «UNIX» ]

kv

Пример конфигурационного файла для обработки логов в формате key=value:

filter {
  kv {
    value_split => "=:"
    fields => ["reminder"]
    field_split => "\t?&"
  }
}
value_split => "=:"

использовать символы "=" и ":" для разделения ключа-значения.

fields => ["reminder"]

название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.

field_split => "\t?&"

использовать символы "\t?&" для разделения ключей. \t — знак табулятора

OUTPUT

stdout

Пример конфигурационного файла для вывода логов в standard output:

output {
  stdout {
    message => "IP - %{clientip}. Full message: %{@message}. End of line."
  }
}
message => "clIP - %{clientip}. Full message: %{@message}. End of line."

указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.

file

Пример конфигурационого файла для записи логов в файл:

output {
  file {
    flush_interval => 5
    gzip=> true
    path => "/var/log/custom/%{clientip}"
  }
}
flush_interval => 5

интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение (что конечно создаст дополнительную нагрузку на диск).

gzip=> true

файл исходящих сообщений будет сжат Gzip.

path => "/var/log/custom/%{clientip}"

путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка.

Формат как будет записано определяет кодек - в общем случае это JSON Так как кодек всех устраивает в большей части примеров он никак не указан

codec

Value type is codec Default value is "json_lines"

elasticsearch

Пример конфигурационного файла для записи логов в базу Elasticsearch:

output {
  elasticsearch {
    cluster => "es_logs"
    host => "192.168.1.1"
    port => "19300"
    index => "logs-%{+YYYY.MM.dd}" 
  }
}
cluster => "es_logs"
port => "19300"

транспортный port Elasticsearch.

host => "192.168.1.1"

IP адрес Elasticsearch

index => "logs-%{+YYYY.MM.dd}" 

название индекса куда будут записываться логи.

Пример

Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1

Альтернативы

Ссылки