Logstash: различия между версиями
Sirmax (обсуждение | вклад) (→udp) |
Sirmax (обсуждение | вклад) (→tcp) |
||
Строка 132: | Строка 132: | ||
Построчное описание настроек: |
Построчное описание настроек: |
||
− | <PRE> |
||
− | type => "webserver_prod" |
||
− | тип/описание лога. |
||
− | </PRE> |
||
<PRE> |
<PRE> |
||
data_timeout => 10 |
data_timeout => 10 |
Версия 10:38, 10 августа 2021
Logstash
Заметки по Logstash с примерами конфигов
Установка
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add - sudo apt-get install apt-transport-https echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list sudo apt-get update sudo apt-get install elasticsearch
Порядок работы - input --> filter (в том числе изменение) -->output
Для каждой фазы есть множество плагинов
input { ... }
filter { ... }
output { ... }
INPUT
Данный метод является входной точкой для логов
Тыщи их если что
В описании только самые простые/ходовые
Beats
Точка для получения логов от Filebeat
- https://www.elastic.co/guide/en/beats/filebeat/current/configuring-howto-filebeat.html
- https://www.elastic.co/guide/en/logstash/current/plugins-inputs-beats.html#plugins-inputs-beats
beats { port => 5400 ssl => true ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"] ssl_certificate => "/etc/elk-certs/elk-ssl.crt" ssl_key => "/etc/elk-certs/elk-ssl.key" ssl_verify_mode => "force_peer" }
Сертефикаты должны быть подписаны одним CA - по сути тут требуется авторизация с помошью сертефикатов (как например OpenVPN или такое можно делать с nginx)
Важнейшая настройка Ж
ssl_verify_modeedit
Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification.
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.
This option needs to be used with ssl_certificate_authorities and a defined list of CAs.
Генерация серетфикатов
Простой скрипт для самоподписанных сертефикатов (хотя конечно лучше организовать свой PKI)
#!/bin/bash CN="logstash.tld" sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt chown logstash elk-ssl.crt chown logstash elk-ssl.key
Важно: В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)
File
input {
file { path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ] exclude => [ "*.gz", "*.zip", "*.rar" ] start_position => "end" stat_interval => 1 discover_interval => 30 }
}
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным (?)
exclude => [ "*.gz", "*.zip", "*.rar" ] исключает из обработки файлы с соответствующими расширениями.
start_position => "end" ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1 как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30 время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.
tcp
Пример конфигурации, для работы с логами удалённых сервисов:
input { tcp { data_timeout => 10 mode => "server" host => "192.168.3.12" port => 3337 } }
Построчное описание настроек:
data_timeout => 10 <PRE> время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто. <PRE> mode => "server" host => "192.168.3.12" port => 3337
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
udp
Для udp настройки аналогичные tcp: input { udp { buffer_size => 4096 host => "192.168.3.12" port => 3337 } }
FILTER
Все плагины - https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
В данном блоке настраиваются основные манипуляции с логами.
Это может быть например
- разбивка по key=value
- удаление ненужных параметров
- замена имеющихся значений
- использование geoip или DNS запросов для ип-адресов или названий хостов.
grok
GROK - разбивает строку с помошью регулярных выражений.
Пример конфигурационного файла для основной нормализации логов:
filter { grok { patterns_dir => "/path/to/patterns/" pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }
Описание настроек: PRE> patterns_dir => "/path/to/patterns/"
путь к каталогу, содержащим шаблоны обработки логов.
Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
указывается шаблон разборки логов.
Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов.
Создавать свои шаблоны необязательно - достаточно встроенных
- https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/ecs-v1/grok-patterns
Подробнее про шаблоны
С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате. Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.
Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:
NUMBER \d+ WORD \b\w+\b USERID [a-zA-Z0-9_-]+
Можно использовать любой ранее созданный шаблон:
USER %{USERID}
Шаблоны можно так же и комбинировать:
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) MAC (?:%{CISCOMAC}|%{WINDOWSMAC})
Допустим формат логов у нас следующий (пример шаблона выше):
55.3.244.1 GET /index.html 15824 0.043
С данным примером лога достаточно pattern записать в виде
"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
После обработки наша строка будет выглядеть следующим образом:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
mutate
Пример конфигурационного файла для изменения/удаления записей из логов:
filter {
mutate { remove_field => [ "client" ] replace => { "message" => "%{source_host}: My new message" }
rename => [ "HOSTORIP", "client_ip" ] gsub => [ "message", "\\/", "_" ] add_field => [ "sample1", "from %{clientip}" ] }
}
remove => [ "client" ]
rename => [ "HOSTORIP", "client_ip" ]
переименование название поля HOSTORIP в client_ip.
add_field => [ "sample1", "from %{clientip}" ] добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных или статических значений.
date
Пример конфигурационого файла:
filter {
date { match => [ "timestamp", "MMM dd HH:mm:ss" ] }
}
match => [ "timestamp", "MMM dd HH:mm:ss" ]
временная метка события.
Если в логах время указано в unix timestamp, то следует использовать match => [ «timestamp», «UNIX» ]
kv
Пример конфигурационного файла для обработки логов в формате key=value:
filter { kv { value_split => "=:" fields => ["reminder"] field_split => "\t?&" } }
value_split => "=:"
использовать символы "=" и ":" для разделения ключа-значения.
fields => ["reminder"]
название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
field_split => "\t?&"
использовать символы "\t?&" для разделения ключей. \t — знак табулятора
OUTPUT
stdout
Пример конфигурационного файла для вывода логов в standard output:
output { stdout { message => "IP - %{clientip}. Full message: %{@message}. End of line." } }
message => "clIP - %{clientip}. Full message: %{@message}. End of line."
указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.
file
Пример конфигурационого файла для записи логов в файл:
output { file { flush_interval => 5 gzip=> true path => "/var/log/custom/%{clientip}/%{type}" } }
flush_interval => 5
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение (что конечно создаст дополнительную нагрузку на диск).
gzip=> true
файл исходящих сообщений будет сжат Gzip.
path => "/var/log/custom/%{clientip}/%{type}"
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка и сообщения будут записываться в файл соответствующий переменной %{type}.
Формат как будет записано определяет кодек - в общем случае это JSON Так как кодек всех устраивает в большей части примеров он никак не указан
codec
Value type is codec Default value is "json_lines"
elasticsearch
Пример конфигурационного файла для записи логов в базу Elasticsearch:
output { elasticsearch { type => "custom_log" cluster => "es_logs" embedded => false host => "192.168.1.1" port => "19300" index => "logs-%{+YYYY.MM.dd}" } }
type => "custom_log"
тип/описание лога.
cluster => "es_logs"
название кластера указанного в cluster.name в настроечном файле Elasticsearch.
embedded => false
указывает какую базу Elasticsearch использовать внутреннюю или стороннюю.
port => "19300"
транспортный port Elasticsearch.
host => "192.168.1.1"
IP адрес Elasticsearch
index => "logs-%{+YYYY.MM.dd}"
название индекса куда будут записываться логи.
Данный плагин можно использовать для алертов.
Пример конфигурационого файла:
output { email { type => "custom_log" from => "logstash@domain.com" to => "admin1@domain.com" cc => "admin2@domain.com" subject => "Found '%{matchName}' Alert on %{@source_host}" body => "Here is the event line %{@message}" htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>" via => "sendmail" options => [ "smtpIporHost", "smtp.gmail.com", "port", "587", "domain", "yourDomain", "userName", "yourSMTPUsername", "password", "PASS", "starttls", "true", "authenticationType", "plain", "debug", "true" ] match => [ "response errors", "response,501,,or,response,301", "multiple response errors", "response,501,,and,response,301" ] } }
type => "custom_log"
тип/описание лога.
from => "logstash@domain.com" to => "admin1@domain.com" cc => "admin2@domain.com"
subject => "Found '%{matchName}' Alert on %{@source_host}" тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match».
body => "Here is the event line %{@message}" htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
тело письма.
via => "sendmail"
способ отсылки письма. Возможен один вариант из двух — smtp или sendmail.
options => ...
стандартные настройки почтовых параметров.
match => [ "response errors", "response,501,,or,response,301", "multiple response errors", "response,501,,and,response,301" ]
«response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается
сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены.
Пример
Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1
Тестирование
Запускаем Logstash: java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf
Проверяем, что Logstash запущен:
- netstat -nat |grep 11111
Если порт 11111 присутствует, значит Logstash готов принимать логи.
В новом терминальном окне пишем: echo "Logs are cool!" | nc localhost 11111
Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает.
Альтернативы
- Vector: