Logstash: различия между версиями

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску
 
(не показано 29 промежуточных версий этого же участника)
Строка 3: Строка 3:
 
[[Категория:Logstash]]
 
[[Категория:Logstash]]
 
[[Категория:Grok]]
 
[[Категория:Grok]]
  +
[[Категория:Elasticsearch]]
  +
[[Категория:Kibana]]
  +
[[Категория:Filebeat]]
 
=Logstash=
 
=Logstash=
  +
Заметки по Logstash с примерами конфигов
   
  +
<B>Заметка возникла по-тому что каждый раз когда надо настроить Logstash я обнаруживал что забыл почти все - и решил записать что б в следующий раз зачитать по бумажке</B>
  +
==Установка==
 
* https://www.elastic.co/guide/en/logstash/current/index.html
 
* https://www.elastic.co/guide/en/logstash/current/index.html
  +
<PRE>
  +
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  +
sudo apt-get install apt-transport-https
  +
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
  +
sudo apt-get update
  +
sudo apt-get install elasticsearch
  +
</PRE>
   
  +
Порядок работы - input --> filter (в том числе изменение) -->output
Заметки по Logstash с примерами конфигов
 
  +
<BR>
 
  +
Для каждой фазы есть множество плагинов
 
<PRE>
 
<PRE>
 
input {
 
input {
Строка 34: Строка 48:
   
 
==Beats==
 
==Beats==
  +
Точка для получения логов от Filebeat
  +
* https://www.elastic.co/guide/en/beats/filebeat/current/configuring-howto-filebeat.html
 
* https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats
 
* https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats
 
<PRE>
 
<PRE>
Строка 45: Строка 61:
 
}
 
}
 
</PRE>
 
</PRE>
  +
Сертефикаты должны быть подписаны одним CA - по сути тут требуется авторизация с помошью сертефикатов (как например OpenVPN или такое можно делать с nginx)
 
  +
<BR>
  +
Важнейшая настройка Ж
  +
<PRE>
 
ssl_verify_modeedit
 
ssl_verify_modeedit
  +
</PRE>
 
Value can be any of: none, peer, force_peer
+
Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification. <BR>
  +
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated. <BR>
Default value is "none"
 
  +
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed. <BR>
By default the server doesn’t do any client verification.
 
  +
<BR>
 
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.
 
 
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.
 
 
 
This option needs to be used with ssl_certificate_authorities and a defined list of CAs.
 
This option needs to be used with ssl_certificate_authorities and a defined list of CAs.
  +
<BR>
   
Генерация серетфикатов
+
===Генерация серетфикатов===
  +
Простой скрипт для самоподписанных сертефикатов (хотя конечно лучше организовать свой PKI)
 
<PRE>
 
<PRE>
 
#!/bin/bash
 
#!/bin/bash
Строка 68: Строка 85:
 
chown logstash elk-ssl.key
 
chown logstash elk-ssl.key
 
</PRE>
 
</PRE>
<B>Важно</B> В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)
+
<B>Важно:</B> В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)
   
 
==File==
 
==File==
Строка 74: Строка 91:
 
input {
 
input {
 
file {
 
file {
type => "some_access_log"
 
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
Строка 82: Строка 98:
 
}
 
}
 
}
 
}
</PRE>
 
<PRE>
 
type => "some_access_log"
 
тип/описание лога. При использовании нескольких входных блоков, удобно их разделять для последующих действий в filter или output.
 
 
</PRE>
 
</PRE>
 
<PRE>
 
<PRE>
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
 
</PRE>
 
</PRE>
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным
+
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (?)
 
<PRE>
 
<PRE>
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
 
exclude => [ "*.gz", "*.zip", "*.rar" ]
Строка 108: Строка 120:
 
</PRE>
 
</PRE>
   
===tcp===
+
==tcp==
 
Пример конфигурации, для работы с логами удалённых сервисов:
 
Пример конфигурации, для работы с логами удалённых сервисов:
 
<PRE>
 
<PRE>
 
input {
 
input {
 
tcp {
 
tcp {
type => "webserver_prod"
 
 
data_timeout => 10
 
data_timeout => 10
 
mode => "server"
 
mode => "server"
Строка 124: Строка 135:
 
Построчное описание настроек:
 
Построчное описание настроек:
 
<PRE>
 
<PRE>
  +
data_timeout => 10
type => "webserver_prod"
 
тип/описание лога.
 
 
</PRE>
 
</PRE>
<PRE>
 
data_timeout => 10
 
<PRE>
 
 
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.
 
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.
 
<PRE>
 
<PRE>
Строка 138: Строка 145:
 
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
 
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
   
===udp===
+
==udp==
 
<PRE>
 
<PRE>
 
Для udp настройки аналогичные tcp:
 
Для udp настройки аналогичные tcp:
 
input {
 
input {
 
udp {
 
udp {
type => "webserver_prod"
 
 
buffer_size => 4096
 
buffer_size => 4096
 
host => "192.168.3.12"
 
host => "192.168.3.12"
Строка 152: Строка 158:
   
 
==FILTER==
 
==FILTER==
  +
Все плагины - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
В данном блоке настраиваются основные манипуляции с логами. Это может быть и разбивка по key=value, и удаление ненужных параметров, и замена имеющихся значений, и использование geoip или DNS запросов для ип-адресов или названий хостов.
 
  +
<BR>
  +
В данном блоке настраиваются основные манипуляции с логами. <BR>
  +
Это может быть например
  +
* разбивка по key=value
  +
* удаление ненужных параметров
  +
* замена имеющихся значений
  +
* использование geoip или DNS запросов для ип-адресов или названий хостов.
  +
   
На первый взгляд применение фильтров может показаться сложным и нелогичным, но это не совсем так.
 
 
===grok===
 
===grok===
  +
GROK - разбивает строку с помошью регулярных выражений.
  +
<BR>
 
Пример конфигурационного файла для основной нормализации логов:
 
Пример конфигурационного файла для основной нормализации логов:
 
<PRE>
 
<PRE>
 
filter {
 
filter {
 
grok {
 
grok {
type => "some_access_log"
 
 
patterns_dir => "/path/to/patterns/"
 
patterns_dir => "/path/to/patterns/"
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
Строка 167: Строка 181:
 
</PRE>
 
</PRE>
   
Построчное описание настроек:
+
Описание настроек:
<PRE>
 
type => "apache_access"
 
</PRE>
 
тип/описание лога. Здесь надо указать тот тип (type), который прописан в блоке input для которого будет происходить обработка.
 
 
<PRE>
 
<PRE>
 
patterns_dir => "/path/to/patterns/"
 
patterns_dir => "/path/to/patterns/"
 
</PRE>
 
</PRE>
путь к каталогу, содержащим шаблоны обработки логов. Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
+
путь к каталогу, содержащим шаблоны обработки логов.<BR>
  +
Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
  +
<BR>
 
<PRE>
 
<PRE>
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
 
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
 
</PRE>
 
</PRE>
  +
указывается шаблон разборки логов.
указывается шаблон разборки логов. Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. Что бы не путаться, я для каждого формата логов создаю отдельный шаблонный файл.
 
  +
<BR>
  +
Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов.
  +
<BR>
  +
Создавать свои шаблоны необязательно - достаточно встроенных
  +
* https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/ecs-v1/grok-patterns
   
 
===Подробнее про шаблоны===
 
===Подробнее про шаблоны===
Строка 203: Строка 220:
 
</PRE>
 
</PRE>
   
Допустим формат логов у нас следующий:
+
Допустим формат логов у нас следующий (пример шаблона выше):
 
<PRE>
 
<PRE>
 
55.3.244.1 GET /index.html 15824 0.043
 
55.3.244.1 GET /index.html 15824 0.043
Строка 224: Строка 241:
 
===mutate===
 
===mutate===
 
Пример конфигурационного файла для изменения/удаления записей из логов:
 
Пример конфигурационного файла для изменения/удаления записей из логов:
  +
* https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html
 
filter {
 
filter {
 
mutate {
 
mutate {
type => "apache_access"
+
remove_field => [ "client" ]
  +
replace => { "message" => "%{source_host}: My new message" }
remove => [ "client" ]
 
  +
 
rename => [ "HOSTORIP", "client_ip" ]
 
rename => [ "HOSTORIP", "client_ip" ]
 
gsub => [ "message", "\\/", "_" ]
 
gsub => [ "message", "\\/", "_" ]
Строка 233: Строка 252:
 
}
 
}
 
}
 
}
  +
<PRE>
 
Построчное описание настроек:
 
type => "apache_access"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
 
remove => [ "client" ]
 
remove => [ "client" ]
  +
</PRE>
удаление всех данных имеющих название поля client. Возможно указание нескольких названий полей.
 
  +
<PRE>
 
 
rename => [ "HOSTORIP", "client_ip" ]
 
rename => [ "HOSTORIP", "client_ip" ]
  +
</PRE>
 
переименование название поля HOSTORIP в client_ip.
 
переименование название поля HOSTORIP в client_ip.
   
  +
<PRE>
gsub => [ "message", "\\/", "_" ]
 
замена всех "/" на "_" в поле messages.
 
 
 
add_field => [ "sample1", "from %{clientip}" ]
 
add_field => [ "sample1", "from %{clientip}" ]
добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных.
+
добавление нового поля «sample1» со значением «from %{clientip}».
  +
Допускается использование названий переменных или статических значений.
 
  +
</PRE>
2.3 date
 
  +
===date===
 
Пример конфигурационого файла:
 
Пример конфигурационого файла:
  +
</PRE>
 
filter {
 
filter {
 
date {
 
date {
type => "apache_access"
 
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
 
}
 
}
 
}
 
}
  +
</PRE>
 
  +
<PRE>
Построчное описание настроек:
 
type => "apache_access"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
 
match => [ "timestamp", "MMM dd HH:mm:ss" ]
  +
</PRE>
временная метка события. Важная настройка для дальнейшей возможности сортировки или выборки логов. Если в логах время указано в unix timestamp (squid), то следует использовать match => [ «timestamp», «UNIX» ]
 
  +
временная метка события.
  +
<BR>
  +
Если в логах время указано в unix timestamp, то следует использовать match => [ «timestamp», «UNIX» ]
   
 
===kv===
 
===kv===
 
Пример конфигурационного файла для обработки логов в формате key=value:
 
Пример конфигурационного файла для обработки логов в формате key=value:
  +
<PRE>
 
filter {
 
filter {
 
kv {
 
kv {
type => "custom_log"
 
 
value_split => "=:"
 
value_split => "=:"
 
fields => ["reminder"]
 
fields => ["reminder"]
Строка 276: Строка 291:
 
}
 
}
 
}
 
}
  +
</PRE>
 
  +
<PRE>
Построчное описание настроек:
 
type => "custom_log"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
 
value_split => "=:"
 
value_split => "=:"
  +
</PRE>
 
использовать символы "=" и ":" для разделения ключа-значения.
 
использовать символы "=" и ":" для разделения ключа-значения.
  +
<PRE>
 
 
fields => ["reminder"]
 
fields => ["reminder"]
  +
</PRE>
 
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
 
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
  +
<PRE>
 
 
field_split => "\t?&"
 
field_split => "\t?&"
  +
</PRE>
 
использовать символы "\t?&" для разделения ключей. \t — знак табулятора
 
использовать символы "\t?&" для разделения ключей. \t — знак табулятора
   
===multiline===
+
==OUTPUT==
  +
* https://www.elastic.co/guide/en/logstash/current/output-plugins.html
Пример конфигурационного файла для «склеивания» многострочных логов (например Java stack trace):
 
filter {
 
multiline {
 
type => "java_log"
 
pattern => "^\s"
 
what => "previous"
 
}
 
}
 
   
Построчное описание настроек:
 
type => "java_log"
 
тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
 
 
pattern => "^\s"
 
регулярное выражение
 
 
what => "previous"
 
при соответствии шаблону «pattern» строка принадлежит предыдущей (previous) строке.
 
 
==OUTPUT==
 
Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков.
 
 
===stdout===
 
===stdout===
 
Пример конфигурационного файла для вывода логов в standard output:
 
Пример конфигурационного файла для вывода логов в standard output:
Строка 317: Строка 313:
 
output {
 
output {
 
stdout {
 
stdout {
type => "custom_log"
 
 
message => "IP - %{clientip}. Full message: %{@message}. End of line."
 
message => "IP - %{clientip}. Full message: %{@message}. End of line."
 
}
 
}
 
}
 
}
 
</PRE>
 
</PRE>
 
<PRE>
 
type => "custom_log"
 
</PRE>
 
тип/описание лога.
 
   
 
<PRE>
 
<PRE>
Строка 338: Строка 328:
 
output {
 
output {
 
file {
 
file {
type => "custom_log"
 
 
flush_interval => 5
 
flush_interval => 5
 
gzip=> true
 
gzip=> true
path => "/var/log/custom/%{clientip}/%{type}"
+
path => "/var/log/custom/%{clientip}"
message_format => "ip: %{clientip} request:%{requri}"
 
 
}
 
}
 
}
 
}
 
</PRE>
 
</PRE>
<PRE>
 
type => "custom_log"
 
</PRE>
 
тип/описание лога.
 
 
<PRE>
 
<PRE>
 
flush_interval => 5
 
flush_interval => 5
 
</PRE>
 
</PRE>
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение.
+
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение (что конечно создаст дополнительную нагрузку на диск).
 
<PRE>
 
<PRE>
 
gzip=> true
 
gzip=> true
Строка 359: Строка 343:
 
файл исходящих сообщений будет сжат Gzip.
 
файл исходящих сообщений будет сжат Gzip.
 
<PRE>
 
<PRE>
path => "/var/log/custom/%{clientip}/%{type}"
+
path => "/var/log/custom/%{clientip}"
 
</PRE>
 
</PRE>
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка и сообщения будут записываться в файл соответствующий переменной %{type}.
+
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные.
  +
В данном примере, для каждого уникального IP адреса будет создана своя папка.
   
  +
<B>Формат как будет записано определяет кодек - в общем случае это JSON</B> Так как кодек всех устраивает в большей части примеров он никак не указан
 
<PRE>
 
<PRE>
  +
codec
message_format => "ip: %{clientip} request:%{requri}"
 
 
</PRE>
 
</PRE>
  +
Value type is codec
формат исходящего сообщения.
 
  +
Default value is "json_lines"
   
 
===elasticsearch===
 
===elasticsearch===
Строка 373: Строка 360:
 
output {
 
output {
 
elasticsearch {
 
elasticsearch {
type => "custom_log"
 
 
cluster => "es_logs"
 
cluster => "es_logs"
embedded => false
 
 
host => "192.168.1.1"
 
host => "192.168.1.1"
 
port => "19300"
 
port => "19300"
Строка 382: Строка 367:
 
}
 
}
 
</PRE>
 
</PRE>
<PRE>
 
type => "custom_log"
 
</PRE>
 
тип/описание лога.
 
 
<PRE>
 
<PRE>
 
cluster => "es_logs"
 
cluster => "es_logs"
 
</PRE>
 
</PRE>
название кластера указанного в cluster.name в настроечном файле Elasticsearch.
 
<PRE>
 
embedded => false
 
</PRE>
 
указывает какую базу Elasticsearch использовать внутреннюю или стороннюю.
 
 
<PRE>
 
<PRE>
 
port => "19300"
 
port => "19300"
Строка 406: Строка 382:
 
</PRE>
 
</PRE>
 
название индекса куда будут записываться логи.
 
название индекса куда будут записываться логи.
 
===email===
 
Данный плагин можно использовать для алертов.
 
 
Пример конфигурационого файла:
 
<PRE>
 
output {
 
email {
 
type => "custom_log"
 
from => "logstash@domain.com"
 
to => "admin1@domain.com"
 
cc => "admin2@domain.com"
 
subject => "Found '%{matchName}' Alert on %{@source_host}"
 
body => "Here is the event line %{@message}"
 
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
 
via => "sendmail"
 
options => [ "smtpIporHost", "smtp.gmail.com",
 
"port", "587",
 
"domain", "yourDomain",
 
"userName", "yourSMTPUsername",
 
"password", "PASS",
 
"starttls", "true",
 
"authenticationType", "plain",
 
"debug", "true"
 
]
 
match => [ "response errors", "response,501,,or,response,301",
 
"multiple response errors", "response,501,,and,response,301" ]
 
 
}
 
}
 
</PRE>
 
<PRE>
 
type => "custom_log"
 
</PRE>
 
тип/описание лога.
 
<PRE>
 
from => "logstash@domain.com"
 
to => "admin1@domain.com"
 
cc => "admin2@domain.com"
 
</PRE>
 
 
<PRE>
 
subject => "Found '%{matchName}' Alert on %{@source_host}"
 
тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match».
 
</PRE>
 
 
<PRE>
 
body => "Here is the event line %{@message}"
 
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
 
</PRE>
 
тело письма.
 
<PRE>
 
via => "sendmail"
 
</PRE>
 
способ отсылки письма. Возможен один вариант из двух — smtp или sendmail.
 
<PRE>
 
options => ...
 
</PRE>
 
стандартные настройки почтовых параметров.
 
<PRE>
 
match => [ "response errors", "response,501,,or,response,301",
 
"multiple response errors", "response,501,,and,response,301" ]
 
</PRE>
 
«response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается
 
 
сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены.
 
   
 
=Пример=
 
=Пример=
  +
Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1
==nginx==
 
Отправка логов nginx в Elastic (один из вариантов) <BR>
 
Тут рассматривается самый простой случай - когда Filebeat не делает преобразований
 
Nginx умеет писать логи в Json
 
 
<PRE>
 
log_format nginxlog_json escape=json.
 
'{ "timestamp": "$time_iso8601", '
 
'"remote_addr": "$remote_addr", '
 
'"body_bytes_sent": $body_bytes_sent, '
 
'"request": "$request", '
 
'"request_method": "$request_method", '
 
'"request_time": $request_time, '
 
'"response_status": $status, '
 
'"upstream_status": $upstream_status,'
 
'"upstream_response_time": $upstream_response_time,'
 
'"upstream_connect_time": $upstream_connect_time,'
 
'"upstream_header_time": $upstream_header_time,'
 
'"upstream_addr": "$upstream_addr",'
 
'"host": "$host",'
 
'"http_x_forwarded_for": "$http_x_forwarded_for",'
 
'"http_referrer": "$http_referer", '
 
'"http_user_agent": "$http_user_agent", '
 
'"http_version": "$server_protocol", '
 
'"nginx_access": true }';
 
</PRE>
 
 
В лог попадает:
 
<PRE>
 
tail -1 /var/log/nginx/access.log.ssl | jq .
 
{
 
"timestamp": "2021-08-08T08:47:59+00:00",
 
"remote_addr": "159.224.49.4",
 
"body_bytes_sent": 361,
 
"request": "POST /internal/bsearch HTTP/1.1",
 
"request_method": "POST",
 
"request_time": 0.152,
 
"response_status": 200,
 
"upstream_status": 200,
 
"upstream_response_time": 0.028,
 
"upstream_connect_time": 0,
 
"upstream_header_time": 0.028,
 
"upstream_addr": "127.0.0.1:5601",
 
"host": "elk.domain.tld",
 
"http_x_forwarded_for": "",
 
"http_referrer": "https://elk.domain.tld/app/discover",
 
"http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
 
"http_version": "HTTP/1.1",
 
"nginx_access": true
 
}
 
</PRE>
 
 
==Filebeat==
 
<PRE>
 
filebeat.inputs:
 
- type: log
 
# Change to true to enable this input configuration.
 
enabled: true
 
# Paths that should be crawled and fetched. Glob based paths.
 
paths:
 
- /var/log/nginx/elk.arturhaunt.ninja-access.log.ssl
 
exclude_files: ['\.gz$']
 
#- c:\programdata\elasticsearch\logs\*
 
# Exclude lines. A list of regular expressions to match. It drops the lines that are
 
# matching any regular expression from the list.
 
#exclude_lines: ['^DBG']
 
# Include lines. A list of regular expressions to match. It exports the lines that are
 
# matching any regular expression from the list.
 
#include_lines: ['^ERR', '^WARN']
 
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
 
# are matching any regular expression from the list. By default, no files are dropped.
 
#exclude_files: ['.gz$']
 
# Optional additional fields. These fields can be freely picked
 
# to add additional information to the crawled log files for filtering
 
#fields:
 
# level: debug
 
# review: 1
 
### Multiline options
 
# Multiline can be used for log messages spanning multiple lines. This is common
 
# for Java Stack Traces or C-Line Continuation
 
# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [
 
#multiline.pattern: ^\[
 
# Defines if the pattern set under pattern should be negated or not. Default is false.
 
#multiline.negate: false
 
# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern
 
# that was (not) matched before or after or as long as a pattern is not matched based on negate.
 
# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash
 
#multiline.match: after
 
- type: filestream
 
# Change to true to enable this input configuration.
 
enabled: false
 
# Paths that should be crawled and fetched. Glob based paths.
 
paths:
 
- /var/log/*.log
 
#- c:\programdata\elasticsearch\logs\*
 
# Exclude lines. A list of regular expressions to match. It drops the lines that are
 
# matching any regular expression from the list.
 
#exclude_lines: ['^DBG']
 
# Include lines. A list of regular expressions to match. It exports the lines that are
 
# matching any regular expression from the list.
 
#include_lines: ['^ERR', '^WARN']
 
# Exclude files. A list of regular expressions to match. Filebeat drops the files that
 
# are matching any regular expression from the list. By default, no files are dropped.
 
#prospector.scanner.exclude_files: ['.gz$']
 
# Optional additional fields. These fields can be freely picked
 
# to add additional information to the crawled log files for filtering
 
#fields:
 
# level: debug
 
# review: 1
 
filebeat.config.modules:
 
# Glob pattern for configuration loading
 
path: ${path.config}/modules.d/*.yml
 
# Set to true to enable config reloading
 
reload.enabled: false
 
# Period on which files under path should be checked for changes
 
#reload.period: 10s
 
setup.template.settings:
 
index.number_of_shards: 1
 
#index.codec: best_compression
 
#_source.enabled: false
 
setup.kibana:
 
# Kibana Host
 
# Scheme and port can be left out and will be set to the default (http and 5601)
 
# In case you specify and additional path, the scheme is required: http://localhost:5601/path
 
# IPv6 addresses should always be defined as: https://[2001:db8::1]:5601
 
#host: "localhost:5601"
 
# Kibana Space ID
 
# ID of the Kibana Space into which the dashboards should be loaded. By default,
 
# the Default Space will be used.
 
#space.id:
 
# Array of hosts to connect to.
 
# hosts: ["localhost:9200"]
 
# Protocol - either `http` (default) or `https`.
 
#protocol: "https"
 
# Authentication credentials - either API key or username/password.
 
#api_key: "id:api_key"
 
#username: "elastic"
 
#password: "changeme"
 
output.logstash:
 
# The Logstash hosts
 
hosts: ["elk.arturhaunt.ninja:5400"]
 
ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"]
 
ssl.certificate: "/etc/elk-certs/elk-ssl.crt"
 
ssl.key: "/etc/elk-certs/elk-ssl.key"
 
# Optional SSL. By default is off.
 
# List of root certificates for HTTPS server verifications
 
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
 
# Certificate for SSL client authentication
 
#ssl.certificate: "/etc/pki/client/cert.pem"
 
# Client Certificate Key
 
#ssl.key: "/etc/pki/client/cert.key"
 
processors:
 
- add_host_metadata:
 
when.not.contains.tags: forwarded
 
- add_cloud_metadata: ~
 
- add_docker_metadata: ~
 
- add_kubernetes_metadata: ~
 
logging.level: debug
 
logging.selectors: ["*"]
 
# Set to true to enable instrumentation of filebeat.
 
#enabled: false
 
# Environment in which filebeat is running on (eg: staging, production, etc.)
 
#environment: ""
 
# APM Server hosts to report instrumentation results to.
 
#hosts:
 
# - http://localhost:8200
 
# API Key for the APM Server(s).
 
# If api_key is set then secret_token will be ignored.
 
#api_key:
 
# Secret token for the APM Server(s).
 
#secret_token:
 
</PRE>
 
 
=Тестирование=
 
 
Запускаем Logstash:
 
java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf
 
 
Проверяем, что Logstash запущен:
 
# netstat -nat |grep 11111
 
Если порт 11111 присутствует, значит Logstash готов принимать логи.
 
 
В новом терминальном окне пишем:
 
echo "Logs are cool!" | nc localhost 11111
 
 
Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает.
 
   
  +
=Альтернативы=
  +
* Vector:
  +
** https://habr.com/ru/post/514480/
 
=Ссылки=
 
=Ссылки=
 
* https://stackoverflow.com/questions/57599095/parse-json-array-string-using-logstash
 
* https://stackoverflow.com/questions/57599095/parse-json-array-string-using-logstash

Текущая версия на 17:27, 23 августа 2021

Logstash

Заметки по Logstash с примерами конфигов

Заметка возникла по-тому что каждый раз когда надо настроить Logstash я обнаруживал что забыл почти все - и решил записать что б в следующий раз зачитать по бумажке

Установка

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
sudo apt-get update
sudo apt-get install elasticsearch

Порядок работы - input --> filter (в том числе изменение) -->output
Для каждой фазы есть множество плагинов

input {
  ...
}
filter {
  ...
}
output {
  ...
}

INPUT

Данный метод является входной точкой для логов

Тыщи их если что

В описании только самые простые/ходовые

Beats

Точка для получения логов от Filebeat

    beats {
        port => 5400
        ssl => true
        ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
        ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
        ssl_key => "/etc/elk-certs/elk-ssl.key"
        ssl_verify_mode => "force_peer"
    }

Сертефикаты должны быть подписаны одним CA - по сути тут требуется авторизация с помошью сертефикатов (как например OpenVPN или такое можно делать с nginx)
Важнейшая настройка Ж

ssl_verify_modeedit

Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification.
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.

This option needs to be used with ssl_certificate_authorities and a defined list of CAs.

Генерация серетфикатов

Простой скрипт для самоподписанных сертефикатов (хотя конечно лучше организовать свой PKI)

#!/bin/bash

CN="logstash.tld"
sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt

chown logstash elk-ssl.crt
chown logstash elk-ssl.key

Важно: В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)

File

input {

 file {
   path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
   exclude => [ "*.gz", "*.zip", "*.rar" ]
   start_position => "end"
   stat_interval => 1
   discover_interval => 30
 }

}

path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]

указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (?)

exclude => [ "*.gz", "*.zip", "*.rar" ]
исключает из обработки файлы с соответствующими расширениями.
start_position => "end"
ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1
как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30
время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.

tcp

Пример конфигурации, для работы с логами удалённых сервисов:

input {
  tcp {
    data_timeout => 10
    mode => "server"
    host => "192.168.3.12"
    port => 3337
  }
}

Построчное описание настроек:

data_timeout => 10

время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.

mode => "server"
host => "192.168.3.12"
port => 3337

в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.

udp

Для udp настройки аналогичные tcp:
input {
  udp {
    buffer_size => 4096
    host => "192.168.3.12"
    port => 3337
  }
}

FILTER

Все плагины - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
В данном блоке настраиваются основные манипуляции с логами.
Это может быть например

  • разбивка по key=value
  • удаление ненужных параметров
  • замена имеющихся значений
  • использование geoip или DNS запросов для ип-адресов или названий хостов.


grok

GROK - разбивает строку с помошью регулярных выражений.
Пример конфигурационного файла для основной нормализации логов:

filter {
  grok {
    patterns_dir => "/path/to/patterns/"
    pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
  }
}

Описание настроек:

patterns_dir => "/path/to/patterns/"

путь к каталогу, содержащим шаблоны обработки логов.
Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.

pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

указывается шаблон разборки логов.
Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов.
Создавать свои шаблоны необязательно - достаточно встроенных

Подробнее про шаблоны

С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате. Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.

Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:

NUMBER \d+
WORD \b\w+\b
USERID [a-zA-Z0-9_-]+

Можно использовать любой ранее созданный шаблон:

USER %{USERID}

Шаблоны можно так же и комбинировать:

CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
MAC (?:%{CISCOMAC}|%{WINDOWSMAC})

Допустим формат логов у нас следующий (пример шаблона выше):

55.3.244.1 GET /index.html 15824 0.043


С данным примером лога достаточно pattern записать в виде

"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

После обработки наша строка будет выглядеть следующим образом:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

mutate

Пример конфигурационного файла для изменения/удаления записей из логов:

filter {

 mutate {
   remove_field => [ "client" ]
   replace => { "message" => "%{source_host}: My new message" }
   rename => [ "HOSTORIP", "client_ip" ]
   gsub => [ "message", "\\/", "_" ]
   add_field => [ "sample1", "from %{clientip}" ]
 }

}

remove => [ "client" ]
rename => [ "HOSTORIP", "client_ip" ]

переименование название поля HOSTORIP в client_ip.

add_field => [ "sample1", "from %{clientip}" ]
добавление нового поля «sample1» со значением «from %{clientip}». 
Допускается использование названий переменных или статических значений.

date

Пример конфигурационого файла:

filter {

 date {
   match => [ "timestamp", "MMM dd HH:mm:ss" ]
 }

}

match => [ "timestamp", "MMM dd HH:mm:ss" ]

временная метка события.
Если в логах время указано в unix timestamp, то следует использовать match => [ «timestamp», «UNIX» ]

kv

Пример конфигурационного файла для обработки логов в формате key=value:

filter {
  kv {
    value_split => "=:"
    fields => ["reminder"]
    field_split => "\t?&"
  }
}
value_split => "=:"

использовать символы "=" и ":" для разделения ключа-значения.

fields => ["reminder"]

название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.

field_split => "\t?&"

использовать символы "\t?&" для разделения ключей. \t — знак табулятора

OUTPUT

stdout

Пример конфигурационного файла для вывода логов в standard output:

output {
  stdout {
    message => "IP - %{clientip}. Full message: %{@message}. End of line."
  }
}
message => "clIP - %{clientip}. Full message: %{@message}. End of line."

указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.

file

Пример конфигурационого файла для записи логов в файл:

output {
  file {
    flush_interval => 5
    gzip=> true
    path => "/var/log/custom/%{clientip}"
  }
}
flush_interval => 5

интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение (что конечно создаст дополнительную нагрузку на диск).

gzip=> true

файл исходящих сообщений будет сжат Gzip.

path => "/var/log/custom/%{clientip}"

путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка.

Формат как будет записано определяет кодек - в общем случае это JSON Так как кодек всех устраивает в большей части примеров он никак не указан

codec

Value type is codec Default value is "json_lines"

elasticsearch

Пример конфигурационного файла для записи логов в базу Elasticsearch:

output {
  elasticsearch {
    cluster => "es_logs"
    host => "192.168.1.1"
    port => "19300"
    index => "logs-%{+YYYY.MM.dd}" 
  }
}
cluster => "es_logs"
port => "19300"

транспортный port Elasticsearch.

host => "192.168.1.1"

IP адрес Elasticsearch

index => "logs-%{+YYYY.MM.dd}" 

название индекса куда будут записываться логи.

Пример

Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1

Альтернативы

Ссылки