Logstash: различия между версиями
Sirmax (обсуждение | вклад) (→Пример) |
Sirmax (обсуждение | вклад) (→grok) |
||
(не показано 26 промежуточных версий этого же участника) | |||
Строка 3: | Строка 3: | ||
[[Категория:Logstash]] |
[[Категория:Logstash]] |
||
[[Категория:Grok]] |
[[Категория:Grok]] |
||
+ | [[Категория:Elasticsearch]] |
||
+ | [[Категория:Kibana]] |
||
+ | [[Категория:Filebeat]] |
||
=Logstash= |
=Logstash= |
||
+ | Заметки по Logstash с примерами конфигов |
||
+ | <B>Заметка возникла по-тому что каждый раз когда надо настроить Logstash я обнаруживал что забыл почти все - и решил записать что б в следующий раз зачитать по бумажке</B> |
||
+ | ==Установка== |
||
* https://www.elastic.co/guide/en/logstash/current/index.html |
* https://www.elastic.co/guide/en/logstash/current/index.html |
||
+ | <PRE> |
||
+ | wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add - |
||
+ | sudo apt-get install apt-transport-https |
||
+ | echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list |
||
+ | sudo apt-get update |
||
+ | sudo apt-get install elasticsearch |
||
+ | </PRE> |
||
+ | Порядок работы - input --> filter (в том числе изменение) -->output |
||
− | Заметки по Logstash с примерами конфигов |
||
+ | <BR> |
||
− | |||
+ | Для каждой фазы есть множество плагинов |
||
<PRE> |
<PRE> |
||
input { |
input { |
||
Строка 34: | Строка 48: | ||
==Beats== |
==Beats== |
||
+ | Точка для получения логов от Filebeat |
||
+ | * https://www.elastic.co/guide/en/beats/filebeat/current/configuring-howto-filebeat.html |
||
* https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats |
* https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats |
||
<PRE> |
<PRE> |
||
Строка 45: | Строка 61: | ||
} |
} |
||
</PRE> |
</PRE> |
||
+ | Сертефикаты должны быть подписаны одним CA - по сути тут требуется авторизация с помошью сертефикатов (как например OpenVPN или такое можно делать с nginx) |
||
− | |||
+ | <BR> |
||
+ | Важнейшая настройка Ж |
||
+ | <PRE> |
||
ssl_verify_modeedit |
ssl_verify_modeedit |
||
+ | </PRE> |
||
− | |||
− | Value can be any of: none, peer, force_peer |
+ | Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification. <BR> |
+ | peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated. <BR> |
||
− | Default value is "none" |
||
+ | force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed. <BR> |
||
− | By default the server doesn’t do any client verification. |
||
+ | <BR> |
||
− | |||
− | peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated. |
||
− | |||
− | force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed. |
||
− | |||
This option needs to be used with ssl_certificate_authorities and a defined list of CAs. |
This option needs to be used with ssl_certificate_authorities and a defined list of CAs. |
||
+ | <BR> |
||
− | Генерация серетфикатов |
+ | ===Генерация серетфикатов=== |
+ | Простой скрипт для самоподписанных сертефикатов (хотя конечно лучше организовать свой PKI) |
||
<PRE> |
<PRE> |
||
#!/bin/bash |
#!/bin/bash |
||
Строка 68: | Строка 85: | ||
chown logstash elk-ssl.key |
chown logstash elk-ssl.key |
||
</PRE> |
</PRE> |
||
− | <B>Важно</B> В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу) |
+ | <B>Важно:</B> В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу) |
==File== |
==File== |
||
Строка 74: | Строка 91: | ||
input { |
input { |
||
file { |
file { |
||
− | type => "some_access_log" |
||
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ] |
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ] |
||
exclude => [ "*.gz", "*.zip", "*.rar" ] |
exclude => [ "*.gz", "*.zip", "*.rar" ] |
||
Строка 82: | Строка 98: | ||
} |
} |
||
} |
} |
||
− | </PRE> |
||
− | <PRE> |
||
− | type => "some_access_log" |
||
− | тип/описание лога. При использовании нескольких входных блоков, удобно их разделять для последующих действий в filter или output. |
||
</PRE> |
</PRE> |
||
<PRE> |
<PRE> |
||
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ] |
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ] |
||
</PRE> |
</PRE> |
||
− | указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным |
+ | указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (?) |
<PRE> |
<PRE> |
||
exclude => [ "*.gz", "*.zip", "*.rar" ] |
exclude => [ "*.gz", "*.zip", "*.rar" ] |
||
Строка 108: | Строка 120: | ||
</PRE> |
</PRE> |
||
− | + | ==tcp== |
|
Пример конфигурации, для работы с логами удалённых сервисов: |
Пример конфигурации, для работы с логами удалённых сервисов: |
||
<PRE> |
<PRE> |
||
input { |
input { |
||
tcp { |
tcp { |
||
− | type => "webserver_prod" |
||
data_timeout => 10 |
data_timeout => 10 |
||
mode => "server" |
mode => "server" |
||
Строка 124: | Строка 135: | ||
Построчное описание настроек: |
Построчное описание настроек: |
||
<PRE> |
<PRE> |
||
+ | data_timeout => 10 |
||
− | type => "webserver_prod" |
||
− | тип/описание лога. |
||
</PRE> |
</PRE> |
||
− | <PRE> |
||
− | data_timeout => 10 |
||
− | <PRE> |
||
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто. |
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто. |
||
<PRE> |
<PRE> |
||
Строка 138: | Строка 145: | ||
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов. |
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов. |
||
− | + | ==udp== |
|
<PRE> |
<PRE> |
||
Для udp настройки аналогичные tcp: |
Для udp настройки аналогичные tcp: |
||
input { |
input { |
||
udp { |
udp { |
||
− | type => "webserver_prod" |
||
buffer_size => 4096 |
buffer_size => 4096 |
||
host => "192.168.3.12" |
host => "192.168.3.12" |
||
Строка 152: | Строка 158: | ||
==FILTER== |
==FILTER== |
||
+ | Все плагины - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html |
||
− | В данном блоке настраиваются основные манипуляции с логами. Это может быть и разбивка по key=value, и удаление ненужных параметров, и замена имеющихся значений, и использование geoip или DNS запросов для ип-адресов или названий хостов. |
||
+ | <BR> |
||
+ | В данном блоке настраиваются основные манипуляции с логами. <BR> |
||
+ | Это может быть например |
||
+ | * разбивка по key=value |
||
+ | * удаление ненужных параметров |
||
+ | * замена имеющихся значений |
||
+ | * использование geoip или DNS запросов для ип-адресов или названий хостов. |
||
+ | |||
− | На первый взгляд применение фильтров может показаться сложным и нелогичным, но это не совсем так. |
||
===grok=== |
===grok=== |
||
+ | GROK - разбивает строку с помошью регулярных выражений. |
||
+ | <BR> |
||
Пример конфигурационного файла для основной нормализации логов: |
Пример конфигурационного файла для основной нормализации логов: |
||
<PRE> |
<PRE> |
||
filter { |
filter { |
||
grok { |
grok { |
||
− | type => "some_access_log" |
||
patterns_dir => "/path/to/patterns/" |
patterns_dir => "/path/to/patterns/" |
||
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" |
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" |
||
Строка 167: | Строка 181: | ||
</PRE> |
</PRE> |
||
− | + | Описание настроек: |
|
− | <PRE> |
||
− | type => "apache_access" |
||
− | </PRE> |
||
− | тип/описание лога. Здесь надо указать тот тип (type), который прописан в блоке input для которого будет происходить обработка. |
||
<PRE> |
<PRE> |
||
patterns_dir => "/path/to/patterns/" |
patterns_dir => "/path/to/patterns/" |
||
</PRE> |
</PRE> |
||
− | путь к каталогу, содержащим шаблоны обработки логов. |
+ | путь к каталогу, содержащим шаблоны обработки логов.<BR> |
+ | Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны. |
||
+ | <BR> |
||
<PRE> |
<PRE> |
||
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" |
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" |
||
</PRE> |
</PRE> |
||
+ | указывается шаблон разборки логов. |
||
− | указывается шаблон разборки логов. Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. Что бы не путаться, я для каждого формата логов создаю отдельный шаблонный файл. |
||
+ | <BR> |
||
+ | Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. |
||
+ | <BR> |
||
+ | Создавать свои шаблоны необязательно - достаточно встроенных |
||
+ | * https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/ecs-v1/grok-patterns |
||
===Подробнее про шаблоны=== |
===Подробнее про шаблоны=== |
||
Строка 203: | Строка 220: | ||
</PRE> |
</PRE> |
||
− | Допустим формат логов у нас следующий: |
+ | Допустим формат логов у нас следующий (пример шаблона выше): |
<PRE> |
<PRE> |
||
55.3.244.1 GET /index.html 15824 0.043 |
55.3.244.1 GET /index.html 15824 0.043 |
||
Строка 224: | Строка 241: | ||
===mutate=== |
===mutate=== |
||
Пример конфигурационного файла для изменения/удаления записей из логов: |
Пример конфигурационного файла для изменения/удаления записей из логов: |
||
+ | * https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html |
||
filter { |
filter { |
||
mutate { |
mutate { |
||
− | + | remove_field => [ "client" ] |
|
+ | replace => { "message" => "%{source_host}: My new message" } |
||
− | remove => [ "client" ] |
||
+ | |||
rename => [ "HOSTORIP", "client_ip" ] |
rename => [ "HOSTORIP", "client_ip" ] |
||
gsub => [ "message", "\\/", "_" ] |
gsub => [ "message", "\\/", "_" ] |
||
Строка 233: | Строка 252: | ||
} |
} |
||
} |
} |
||
+ | <PRE> |
||
− | |||
− | Построчное описание настроек: |
||
− | type => "apache_access" |
||
− | тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка. |
||
− | |||
remove => [ "client" ] |
remove => [ "client" ] |
||
+ | </PRE> |
||
− | удаление всех данных имеющих название поля client. Возможно указание нескольких названий полей. |
||
+ | <PRE> |
||
− | |||
rename => [ "HOSTORIP", "client_ip" ] |
rename => [ "HOSTORIP", "client_ip" ] |
||
+ | </PRE> |
||
переименование название поля HOSTORIP в client_ip. |
переименование название поля HOSTORIP в client_ip. |
||
+ | <PRE> |
||
− | gsub => [ "message", "\\/", "_" ] |
||
− | замена всех "/" на "_" в поле messages. |
||
− | |||
add_field => [ "sample1", "from %{clientip}" ] |
add_field => [ "sample1", "from %{clientip}" ] |
||
− | добавление нового поля «sample1» со значением «from %{clientip}». |
+ | добавление нового поля «sample1» со значением «from %{clientip}». |
+ | Допускается использование названий переменных или статических значений. |
||
− | |||
+ | </PRE> |
||
− | 2.3 date |
||
+ | ===date=== |
||
Пример конфигурационого файла: |
Пример конфигурационого файла: |
||
+ | </PRE> |
||
filter { |
filter { |
||
date { |
date { |
||
− | type => "apache_access" |
||
match => [ "timestamp", "MMM dd HH:mm:ss" ] |
match => [ "timestamp", "MMM dd HH:mm:ss" ] |
||
} |
} |
||
} |
} |
||
+ | </PRE> |
||
− | |||
+ | <PRE> |
||
− | Построчное описание настроек: |
||
− | type => "apache_access" |
||
− | тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка. |
||
− | |||
match => [ "timestamp", "MMM dd HH:mm:ss" ] |
match => [ "timestamp", "MMM dd HH:mm:ss" ] |
||
+ | </PRE> |
||
− | временная метка события. Важная настройка для дальнейшей возможности сортировки или выборки логов. Если в логах время указано в unix timestamp (squid), то следует использовать match => [ «timestamp», «UNIX» ] |
||
+ | временная метка события. |
||
+ | <BR> |
||
+ | Если в логах время указано в unix timestamp, то следует использовать match => [ «timestamp», «UNIX» ] |
||
===kv=== |
===kv=== |
||
Пример конфигурационного файла для обработки логов в формате key=value: |
Пример конфигурационного файла для обработки логов в формате key=value: |
||
+ | <PRE> |
||
filter { |
filter { |
||
kv { |
kv { |
||
− | type => "custom_log" |
||
value_split => "=:" |
value_split => "=:" |
||
fields => ["reminder"] |
fields => ["reminder"] |
||
Строка 276: | Строка 291: | ||
} |
} |
||
} |
} |
||
+ | </PRE> |
||
− | |||
+ | <PRE> |
||
− | Построчное описание настроек: |
||
− | type => "custom_log" |
||
− | тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка. |
||
− | |||
value_split => "=:" |
value_split => "=:" |
||
+ | </PRE> |
||
использовать символы "=" и ":" для разделения ключа-значения. |
использовать символы "=" и ":" для разделения ключа-значения. |
||
+ | <PRE> |
||
− | |||
fields => ["reminder"] |
fields => ["reminder"] |
||
+ | </PRE> |
||
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога. |
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога. |
||
+ | <PRE> |
||
− | |||
field_split => "\t?&" |
field_split => "\t?&" |
||
+ | </PRE> |
||
использовать символы "\t?&" для разделения ключей. \t — знак табулятора |
использовать символы "\t?&" для разделения ключей. \t — знак табулятора |
||
− | == |
+ | ==OUTPUT== |
+ | * https://www.elastic.co/guide/en/logstash/current/output-plugins.html |
||
− | Пример конфигурационного файла для «склеивания» многострочных логов (например Java stack trace): |
||
− | filter { |
||
− | multiline { |
||
− | type => "java_log" |
||
− | pattern => "^\s" |
||
− | what => "previous" |
||
− | } |
||
− | } |
||
− | Построчное описание настроек: |
||
− | type => "java_log" |
||
− | тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка. |
||
− | |||
− | pattern => "^\s" |
||
− | регулярное выражение |
||
− | |||
− | what => "previous" |
||
− | при соответствии шаблону «pattern» строка принадлежит предыдущей (previous) строке. |
||
− | |||
− | ==OUTPUT== |
||
− | Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков. |
||
===stdout=== |
===stdout=== |
||
Пример конфигурационного файла для вывода логов в standard output: |
Пример конфигурационного файла для вывода логов в standard output: |
||
Строка 317: | Строка 313: | ||
output { |
output { |
||
stdout { |
stdout { |
||
− | type => "custom_log" |
||
message => "IP - %{clientip}. Full message: %{@message}. End of line." |
message => "IP - %{clientip}. Full message: %{@message}. End of line." |
||
} |
} |
||
} |
} |
||
</PRE> |
</PRE> |
||
− | |||
− | <PRE> |
||
− | type => "custom_log" |
||
− | </PRE> |
||
− | тип/описание лога. |
||
<PRE> |
<PRE> |
||
Строка 338: | Строка 328: | ||
output { |
output { |
||
file { |
file { |
||
− | type => "custom_log" |
||
flush_interval => 5 |
flush_interval => 5 |
||
gzip=> true |
gzip=> true |
||
− | path => "/var/log/custom/%{clientip |
+ | path => "/var/log/custom/%{clientip}" |
− | message_format => "ip: %{clientip} request:%{requri}" |
||
} |
} |
||
} |
} |
||
</PRE> |
</PRE> |
||
− | <PRE> |
||
− | type => "custom_log" |
||
− | </PRE> |
||
− | тип/описание лога. |
||
<PRE> |
<PRE> |
||
flush_interval => 5 |
flush_interval => 5 |
||
</PRE> |
</PRE> |
||
− | интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение. |
+ | интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение (что конечно создаст дополнительную нагрузку на диск). |
<PRE> |
<PRE> |
||
gzip=> true |
gzip=> true |
||
Строка 359: | Строка 343: | ||
файл исходящих сообщений будет сжат Gzip. |
файл исходящих сообщений будет сжат Gzip. |
||
<PRE> |
<PRE> |
||
− | path => "/var/log/custom/%{clientip |
+ | path => "/var/log/custom/%{clientip}" |
</PRE> |
</PRE> |
||
− | путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. |
+ | путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. |
+ | В данном примере, для каждого уникального IP адреса будет создана своя папка. |
||
+ | <B>Формат как будет записано определяет кодек - в общем случае это JSON</B> Так как кодек всех устраивает в большей части примеров он никак не указан |
||
<PRE> |
<PRE> |
||
+ | codec |
||
− | message_format => "ip: %{clientip} request:%{requri}" |
||
</PRE> |
</PRE> |
||
+ | Value type is codec |
||
− | формат исходящего сообщения. |
||
+ | Default value is "json_lines" |
||
===elasticsearch=== |
===elasticsearch=== |
||
Строка 373: | Строка 360: | ||
output { |
output { |
||
elasticsearch { |
elasticsearch { |
||
− | type => "custom_log" |
||
cluster => "es_logs" |
cluster => "es_logs" |
||
− | embedded => false |
||
host => "192.168.1.1" |
host => "192.168.1.1" |
||
port => "19300" |
port => "19300" |
||
Строка 382: | Строка 367: | ||
} |
} |
||
</PRE> |
</PRE> |
||
− | <PRE> |
||
− | type => "custom_log" |
||
− | </PRE> |
||
− | тип/описание лога. |
||
<PRE> |
<PRE> |
||
cluster => "es_logs" |
cluster => "es_logs" |
||
</PRE> |
</PRE> |
||
− | название кластера указанного в cluster.name в настроечном файле Elasticsearch. |
||
− | <PRE> |
||
− | embedded => false |
||
− | </PRE> |
||
− | указывает какую базу Elasticsearch использовать внутреннюю или стороннюю. |
||
<PRE> |
<PRE> |
||
port => "19300" |
port => "19300" |
||
Строка 406: | Строка 382: | ||
</PRE> |
</PRE> |
||
название индекса куда будут записываться логи. |
название индекса куда будут записываться логи. |
||
− | |||
− | ===email=== |
||
− | Данный плагин можно использовать для алертов. |
||
− | |||
− | Пример конфигурационого файла: |
||
− | <PRE> |
||
− | output { |
||
− | email { |
||
− | type => "custom_log" |
||
− | from => "logstash@domain.com" |
||
− | to => "admin1@domain.com" |
||
− | cc => "admin2@domain.com" |
||
− | subject => "Found '%{matchName}' Alert on %{@source_host}" |
||
− | body => "Here is the event line %{@message}" |
||
− | htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>" |
||
− | via => "sendmail" |
||
− | options => [ "smtpIporHost", "smtp.gmail.com", |
||
− | "port", "587", |
||
− | "domain", "yourDomain", |
||
− | "userName", "yourSMTPUsername", |
||
− | "password", "PASS", |
||
− | "starttls", "true", |
||
− | "authenticationType", "plain", |
||
− | "debug", "true" |
||
− | ] |
||
− | match => [ "response errors", "response,501,,or,response,301", |
||
− | "multiple response errors", "response,501,,and,response,301" ] |
||
− | |||
− | } |
||
− | } |
||
− | </PRE> |
||
− | <PRE> |
||
− | type => "custom_log" |
||
− | </PRE> |
||
− | тип/описание лога. |
||
− | <PRE> |
||
− | from => "logstash@domain.com" |
||
− | to => "admin1@domain.com" |
||
− | cc => "admin2@domain.com" |
||
− | </PRE> |
||
− | |||
− | <PRE> |
||
− | subject => "Found '%{matchName}' Alert on %{@source_host}" |
||
− | тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match». |
||
− | </PRE> |
||
− | |||
− | <PRE> |
||
− | body => "Here is the event line %{@message}" |
||
− | htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>" |
||
− | </PRE> |
||
− | тело письма. |
||
− | <PRE> |
||
− | via => "sendmail" |
||
− | </PRE> |
||
− | способ отсылки письма. Возможен один вариант из двух — smtp или sendmail. |
||
− | <PRE> |
||
− | options => ... |
||
− | </PRE> |
||
− | стандартные настройки почтовых параметров. |
||
− | <PRE> |
||
− | match => [ "response errors", "response,501,,or,response,301", |
||
− | "multiple response errors", "response,501,,and,response,301" ] |
||
− | </PRE> |
||
− | «response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается |
||
− | |||
− | сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены. |
||
=Пример= |
=Пример= |
||
+ | Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1 |
||
− | ==nginx== |
||
− | Отправка логов nginx в Elastic (один из вариантов) <BR> |
||
− | Тут рассматривается самый простой случай - когда Filebeat не делает преобразований |
||
− | Nginx умеет писать логи в Json |
||
− | |||
− | <PRE> |
||
− | log_format nginxlog_json escape=json. |
||
− | '{ "timestamp": "$time_iso8601", ' |
||
− | '"remote_addr": "$remote_addr", ' |
||
− | '"body_bytes_sent": $body_bytes_sent, ' |
||
− | '"request": "$request", ' |
||
− | '"request_method": "$request_method", ' |
||
− | '"request_time": $request_time, ' |
||
− | '"response_status": $status, ' |
||
− | '"upstream_status": $upstream_status,' |
||
− | '"upstream_response_time": $upstream_response_time,' |
||
− | '"upstream_connect_time": $upstream_connect_time,' |
||
− | '"upstream_header_time": $upstream_header_time,' |
||
− | '"upstream_addr": "$upstream_addr",' |
||
− | '"host": "$host",' |
||
− | '"http_x_forwarded_for": "$http_x_forwarded_for",' |
||
− | '"http_referrer": "$http_referer", ' |
||
− | '"http_user_agent": "$http_user_agent", ' |
||
− | '"http_version": "$server_protocol", ' |
||
− | '"nginx_access": true }'; |
||
− | </PRE> |
||
− | |||
− | В лог попадает: |
||
− | <PRE> |
||
− | tail -1 /var/log/nginx/access.log.ssl | jq . |
||
− | { |
||
− | "timestamp": "2021-08-08T08:47:59+00:00", |
||
− | "remote_addr": "159.224.49.4", |
||
− | "body_bytes_sent": 361, |
||
− | "request": "POST /internal/bsearch HTTP/1.1", |
||
− | "request_method": "POST", |
||
− | "request_time": 0.152, |
||
− | "response_status": 200, |
||
− | "upstream_status": 200, |
||
− | "upstream_response_time": 0.028, |
||
− | "upstream_connect_time": 0, |
||
− | "upstream_header_time": 0.028, |
||
− | "upstream_addr": "127.0.0.1:5601", |
||
− | "host": "elk.domain.tld", |
||
− | "http_x_forwarded_for": "", |
||
− | "http_referrer": "https://elk.domain.tld/app/discover", |
||
− | "http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15", |
||
− | "http_version": "HTTP/1.1", |
||
− | "nginx_access": true |
||
− | } |
||
− | </PRE> |
||
− | |||
− | ==Filebeat== |
||
− | <PRE> |
||
− | filebeat.inputs: |
||
− | - type: log |
||
− | enabled: true |
||
− | paths: |
||
− | - /var/log/nginx/elk.domain.tld-access.log.ssl |
||
− | exclude_files: ['\.gz$'] |
||
− | filebeat.config.modules: |
||
− | reload.enabled: false |
||
− | setup.template.settings: |
||
− | index.number_of_shards: 1 |
||
− | setup.kibana: |
||
− | output.logstash: |
||
− | hosts: ["elk.domain.tld:5400"] |
||
− | ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"] |
||
− | ssl.certificate: "/etc/elk-certs/elk-ssl.crt" |
||
− | ssl.key: "/etc/elk-certs/elk-ssl.key" |
||
− | processors: |
||
− | - add_host_metadata: |
||
− | when.not.contains.tags: forwarded |
||
− | - add_cloud_metadata: ~ |
||
− | - add_docker_metadata: ~ |
||
− | - add_kubernetes_metadata: ~ |
||
− | logging.level: debug |
||
− | logging.selectors: ["*"] |
||
− | </PRE> |
||
− | В примере мониторится всего один файл (исключение очевидно лишнее) |
||
− | <BR> |
||
− | Результат пересылается на удаленный хост в Logstash |
||
− | <B>Важно</B> Сертификат должен соответствовать хостнейму - тут нельзя указать IP адрес |
||
− | <PRE> |
||
− | hosts: ["elk.domain.tld:5400"] |
||
− | </PRE> |
||
− | |||
− | ==Logstash== |
||
− | |||
− | =Тестирование= |
||
− | |||
− | Запускаем Logstash: |
||
− | java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf |
||
− | |||
− | Проверяем, что Logstash запущен: |
||
− | # netstat -nat |grep 11111 |
||
− | Если порт 11111 присутствует, значит Logstash готов принимать логи. |
||
− | |||
− | В новом терминальном окне пишем: |
||
− | echo "Logs are cool!" | nc localhost 11111 |
||
− | |||
− | Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает. |
||
+ | =Альтернативы= |
||
+ | * Vector: |
||
+ | ** https://habr.com/ru/post/514480/ |
||
=Ссылки= |
=Ссылки= |
||
* https://stackoverflow.com/questions/57599095/parse-json-array-string-using-logstash |
* https://stackoverflow.com/questions/57599095/parse-json-array-string-using-logstash |
Текущая версия на 16:27, 23 августа 2021
Logstash
Заметки по Logstash с примерами конфигов
Заметка возникла по-тому что каждый раз когда надо настроить Logstash я обнаруживал что забыл почти все - и решил записать что б в следующий раз зачитать по бумажке
Установка
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add - sudo apt-get install apt-transport-https echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list sudo apt-get update sudo apt-get install elasticsearch
Порядок работы - input --> filter (в том числе изменение) -->output
Для каждой фазы есть множество плагинов
input { ... }
filter { ... }
output { ... }
INPUT
Данный метод является входной точкой для логов
Тыщи их если что
В описании только самые простые/ходовые
Beats
Точка для получения логов от Filebeat
- https://www.elastic.co/guide/en/beats/filebeat/current/configuring-howto-filebeat.html
- https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats
beats { port => 5400 ssl => true ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"] ssl_certificate => "/etc/elk-certs/elk-ssl.crt" ssl_key => "/etc/elk-certs/elk-ssl.key" ssl_verify_mode => "force_peer" }
Сертефикаты должны быть подписаны одним CA - по сути тут требуется авторизация с помошью сертефикатов (как например OpenVPN или такое можно делать с nginx)
Важнейшая настройка Ж
ssl_verify_modeedit
Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification.
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.
This option needs to be used with ssl_certificate_authorities and a defined list of CAs.
Генерация серетфикатов
Простой скрипт для самоподписанных сертефикатов (хотя конечно лучше организовать свой PKI)
#!/bin/bash CN="logstash.tld" sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt chown logstash elk-ssl.crt chown logstash elk-ssl.key
Важно: В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)
File
input {
file { path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ] exclude => [ "*.gz", "*.zip", "*.rar" ] start_position => "end" stat_interval => 1 discover_interval => 30 }
}
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (?)
exclude => [ "*.gz", "*.zip", "*.rar" ] исключает из обработки файлы с соответствующими расширениями.
start_position => "end" ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1 как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30 время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.
tcp
Пример конфигурации, для работы с логами удалённых сервисов:
input { tcp { data_timeout => 10 mode => "server" host => "192.168.3.12" port => 3337 } }
Построчное описание настроек:
data_timeout => 10
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.
mode => "server" host => "192.168.3.12" port => 3337
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
udp
Для udp настройки аналогичные tcp: input { udp { buffer_size => 4096 host => "192.168.3.12" port => 3337 } }
FILTER
Все плагины - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
В данном блоке настраиваются основные манипуляции с логами.
Это может быть например
- разбивка по key=value
- удаление ненужных параметров
- замена имеющихся значений
- использование geoip или DNS запросов для ип-адресов или названий хостов.
grok
GROK - разбивает строку с помошью регулярных выражений.
Пример конфигурационного файла для основной нормализации логов:
filter { grok { patterns_dir => "/path/to/patterns/" pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }
Описание настроек:
patterns_dir => "/path/to/patterns/"
путь к каталогу, содержащим шаблоны обработки логов.
Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
указывается шаблон разборки логов.
Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов.
Создавать свои шаблоны необязательно - достаточно встроенных
- https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/ecs-v1/grok-patterns
Подробнее про шаблоны
С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате. Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.
Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:
NUMBER \d+ WORD \b\w+\b USERID [a-zA-Z0-9_-]+
Можно использовать любой ранее созданный шаблон:
USER %{USERID}
Шаблоны можно так же и комбинировать:
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) MAC (?:%{CISCOMAC}|%{WINDOWSMAC})
Допустим формат логов у нас следующий (пример шаблона выше):
55.3.244.1 GET /index.html 15824 0.043
С данным примером лога достаточно pattern записать в виде
"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
После обработки наша строка будет выглядеть следующим образом:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
mutate
Пример конфигурационного файла для изменения/удаления записей из логов:
filter {
mutate { remove_field => [ "client" ] replace => { "message" => "%{source_host}: My new message" }
rename => [ "HOSTORIP", "client_ip" ] gsub => [ "message", "\\/", "_" ] add_field => [ "sample1", "from %{clientip}" ] }
}
remove => [ "client" ]
rename => [ "HOSTORIP", "client_ip" ]
переименование название поля HOSTORIP в client_ip.
add_field => [ "sample1", "from %{clientip}" ] добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных или статических значений.
date
Пример конфигурационого файла:
filter {
date { match => [ "timestamp", "MMM dd HH:mm:ss" ] }
}
match => [ "timestamp", "MMM dd HH:mm:ss" ]
временная метка события.
Если в логах время указано в unix timestamp, то следует использовать match => [ «timestamp», «UNIX» ]
kv
Пример конфигурационного файла для обработки логов в формате key=value:
filter { kv { value_split => "=:" fields => ["reminder"] field_split => "\t?&" } }
value_split => "=:"
использовать символы "=" и ":" для разделения ключа-значения.
fields => ["reminder"]
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
field_split => "\t?&"
использовать символы "\t?&" для разделения ключей. \t — знак табулятора
OUTPUT
stdout
Пример конфигурационного файла для вывода логов в standard output:
output { stdout { message => "IP - %{clientip}. Full message: %{@message}. End of line." } }
message => "clIP - %{clientip}. Full message: %{@message}. End of line."
указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.
file
Пример конфигурационого файла для записи логов в файл:
output { file { flush_interval => 5 gzip=> true path => "/var/log/custom/%{clientip}" } }
flush_interval => 5
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение (что конечно создаст дополнительную нагрузку на диск).
gzip=> true
файл исходящих сообщений будет сжат Gzip.
path => "/var/log/custom/%{clientip}"
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка.
Формат как будет записано определяет кодек - в общем случае это JSON Так как кодек всех устраивает в большей части примеров он никак не указан
codec
Value type is codec Default value is "json_lines"
elasticsearch
Пример конфигурационного файла для записи логов в базу Elasticsearch:
output { elasticsearch { cluster => "es_logs" host => "192.168.1.1" port => "19300" index => "logs-%{+YYYY.MM.dd}" } }
cluster => "es_logs"
port => "19300"
транспортный port Elasticsearch.
host => "192.168.1.1"
IP адрес Elasticsearch
index => "logs-%{+YYYY.MM.dd}"
название индекса куда будут записываться логи.
Пример
Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1
Альтернативы
- Vector: