Logstash
Logstash
Заметки по Logstash с примерами конфигов
input { ... }
filter { ... }
output { ... }
INPUT
Данный метод является входной точкой для логов
Тыщи их если что
В описании только самые простые/ходовые
Beats
beats { port => 5400 ssl => true ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"] ssl_certificate => "/etc/elk-certs/elk-ssl.crt" ssl_key => "/etc/elk-certs/elk-ssl.key" ssl_verify_mode => "force_peer" }
ssl_verify_modeedit
Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification.
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.
This option needs to be used with ssl_certificate_authorities and a defined list of CAs.
Генерация серетфикатов
#!/bin/bash CN="logstash.tld" sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt chown logstash elk-ssl.crt chown logstash elk-ssl.key
Важно В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)
File
input {
file { type => "some_access_log" path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ] exclude => [ "*.gz", "*.zip", "*.rar" ] start_position => "end" stat_interval => 1 discover_interval => 30 }
}
type => "some_access_log" тип/описание лога. При использовании нескольких входных блоков, удобно их разделять для последующих действий в filter или output.
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным
exclude => [ "*.gz", "*.zip", "*.rar" ] исключает из обработки файлы с соответствующими расширениями.
start_position => "end" ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1 как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30 время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.
tcp
Пример конфигурации, для работы с логами удалённых сервисов:
input { tcp { type => "webserver_prod" data_timeout => 10 mode => "server" host => "192.168.3.12" port => 3337 } }
Построчное описание настроек:
type => "webserver_prod" тип/описание лога.
data_timeout => 10 <PRE> время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто. <PRE> mode => "server" host => "192.168.3.12" port => 3337
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
udp
Для udp настройки аналогичные tcp: input { udp { type => "webserver_prod" buffer_size => 4096 host => "192.168.3.12" port => 3337 } }
FILTER
В данном блоке настраиваются основные манипуляции с логами. Это может быть и разбивка по key=value, и удаление ненужных параметров, и замена имеющихся значений, и использование geoip или DNS запросов для ип-адресов или названий хостов.
На первый взгляд применение фильтров может показаться сложным и нелогичным, но это не совсем так.
grok
Пример конфигурационного файла для основной нормализации логов:
filter { grok { type => "some_access_log" patterns_dir => "/path/to/patterns/" pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } }
Построчное описание настроек:
type => "apache_access"
тип/описание лога. Здесь надо указать тот тип (type), который прописан в блоке input для которого будет происходить обработка.
patterns_dir => "/path/to/patterns/"
путь к каталогу, содержащим шаблоны обработки логов. Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
указывается шаблон разборки логов. Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. Что бы не путаться, я для каждого формата логов создаю отдельный шаблонный файл.
Подробнее про шаблоны
С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате. Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.
Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:
NUMBER \d+ WORD \b\w+\b USERID [a-zA-Z0-9_-]+
Можно использовать любой ранее созданный шаблон:
USER %{USERID}
Шаблоны можно так же и комбинировать:
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) MAC (?:%{CISCOMAC}|%{WINDOWSMAC})
Допустим формат логов у нас следующий:
55.3.244.1 GET /index.html 15824 0.043
С данным примером лога достаточно pattern записать в виде
"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
После обработки наша строка будет выглядеть следующим образом:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
mutate
Пример конфигурационного файла для изменения/удаления записей из логов: filter {
mutate { type => "apache_access" remove => [ "client" ] rename => [ "HOSTORIP", "client_ip" ] gsub => [ "message", "\\/", "_" ] add_field => [ "sample1", "from %{clientip}" ] }
}
Построчное описание настроек: type => "apache_access" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
remove => [ "client" ] удаление всех данных имеющих название поля client. Возможно указание нескольких названий полей.
rename => [ "HOSTORIP", "client_ip" ] переименование название поля HOSTORIP в client_ip.
gsub => [ "message", "\\/", "_" ] замена всех "/" на "_" в поле messages.
add_field => [ "sample1", "from %{clientip}" ] добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных.
2.3 date Пример конфигурационого файла: filter {
date { type => "apache_access" match => [ "timestamp", "MMM dd HH:mm:ss" ] }
}
Построчное описание настроек: type => "apache_access" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
match => [ "timestamp", "MMM dd HH:mm:ss" ] временная метка события. Важная настройка для дальнейшей возможности сортировки или выборки логов. Если в логах время указано в unix timestamp (squid), то следует использовать match => [ «timestamp», «UNIX» ]
kv
Пример конфигурационного файла для обработки логов в формате key=value: filter {
kv { type => "custom_log" value_split => "=:" fields => ["reminder"] field_split => "\t?&" }
}
Построчное описание настроек: type => "custom_log" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
value_split => "=:" использовать символы "=" и ":" для разделения ключа-значения.
fields => ["reminder"] название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
field_split => "\t?&" использовать символы "\t?&" для разделения ключей. \t — знак табулятора
multiline
Пример конфигурационного файла для «склеивания» многострочных логов (например Java stack trace): filter {
multiline { type => "java_log" pattern => "^\s" what => "previous" }
}
Построчное описание настроек: type => "java_log" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
pattern => "^\s" регулярное выражение
what => "previous" при соответствии шаблону «pattern» строка принадлежит предыдущей (previous) строке.
OUTPUT
Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков.
stdout
Пример конфигурационного файла для вывода логов в standard output:
output { stdout { type => "custom_log" message => "IP - %{clientip}. Full message: %{@message}. End of line." } }
type => "custom_log"
тип/описание лога.
message => "clIP - %{clientip}. Full message: %{@message}. End of line."
указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.
file
Пример конфигурационого файла для записи логов в файл:
output { file { type => "custom_log" flush_interval => 5 gzip=> true path => "/var/log/custom/%{clientip}/%{type}" message_format => "ip: %{clientip} request:%{requri}" } }
type => "custom_log"
тип/описание лога.
flush_interval => 5
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение.
gzip=> true
файл исходящих сообщений будет сжат Gzip.
path => "/var/log/custom/%{clientip}/%{type}"
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка и сообщения будут записываться в файл соответствующий переменной %{type}.
message_format => "ip: %{clientip} request:%{requri}"
формат исходящего сообщения.
elasticsearch
Пример конфигурационного файла для записи логов в базу Elasticsearch:
output { elasticsearch { type => "custom_log" cluster => "es_logs" embedded => false host => "192.168.1.1" port => "19300" index => "logs-%{+YYYY.MM.dd}" } }
type => "custom_log"
тип/описание лога.
cluster => "es_logs"
название кластера указанного в cluster.name в настроечном файле Elasticsearch.
embedded => false
указывает какую базу Elasticsearch использовать внутреннюю или стороннюю.
port => "19300"
транспортный port Elasticsearch.
host => "192.168.1.1"
IP адрес Elasticsearch
index => "logs-%{+YYYY.MM.dd}"
название индекса куда будут записываться логи.
Данный плагин можно использовать для алертов.
Пример конфигурационого файла:
output { email { type => "custom_log" from => "logstash@domain.com" to => "admin1@domain.com" cc => "admin2@domain.com" subject => "Found '%{matchName}' Alert on %{@source_host}" body => "Here is the event line %{@message}" htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>" via => "sendmail" options => [ "smtpIporHost", "smtp.gmail.com", "port", "587", "domain", "yourDomain", "userName", "yourSMTPUsername", "password", "PASS", "starttls", "true", "authenticationType", "plain", "debug", "true" ] match => [ "response errors", "response,501,,or,response,301", "multiple response errors", "response,501,,and,response,301" ] } }
type => "custom_log"
тип/описание лога.
from => "logstash@domain.com" to => "admin1@domain.com" cc => "admin2@domain.com"
subject => "Found '%{matchName}' Alert on %{@source_host}" тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match».
body => "Here is the event line %{@message}" htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
тело письма.
via => "sendmail"
способ отсылки письма. Возможен один вариант из двух — smtp или sendmail.
options => ...
стандартные настройки почтовых параметров.
match => [ "response errors", "response,501,,or,response,301", "multiple response errors", "response,501,,and,response,301" ]
«response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается
сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены.
Пример
nginx
Отправка логов nginx в Elastic (один из вариантов)
Тут рассматривается самый простой случай - когда Filebeat не делает преобразований
Nginx умеет писать логи в Json
log_format nginxlog_json escape=json. '{ "timestamp": "$time_iso8601", ' '"remote_addr": "$remote_addr", ' '"body_bytes_sent": $body_bytes_sent, ' '"request": "$request", ' '"request_method": "$request_method", ' '"request_time": $request_time, ' '"response_status": $status, ' '"upstream_status": $upstream_status,' '"upstream_response_time": $upstream_response_time,' '"upstream_connect_time": $upstream_connect_time,' '"upstream_header_time": $upstream_header_time,' '"upstream_addr": "$upstream_addr",' '"host": "$host",' '"http_x_forwarded_for": "$http_x_forwarded_for",' '"http_referrer": "$http_referer", ' '"http_user_agent": "$http_user_agent", ' '"http_version": "$server_protocol", ' '"nginx_access": true }';
В лог попадает:
tail -1 /var/log/nginx/access.log.ssl | jq . { "timestamp": "2021-08-08T08:47:59+00:00", "remote_addr": "159.224.49.4", "body_bytes_sent": 361, "request": "POST /internal/bsearch HTTP/1.1", "request_method": "POST", "request_time": 0.152, "response_status": 200, "upstream_status": 200, "upstream_response_time": 0.028, "upstream_connect_time": 0, "upstream_header_time": 0.028, "upstream_addr": "127.0.0.1:5601", "host": "elk.domain.tld", "http_x_forwarded_for": "", "http_referrer": "https://elk.domain.tld/app/discover", "http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15", "http_version": "HTTP/1.1", "nginx_access": true }
Filebeat
filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/elk.domain.tld-access.log.ssl exclude_files: ['\.gz$'] filebeat.config.modules: reload.enabled: false setup.template.settings: index.number_of_shards: 1 setup.kibana: output.logstash: hosts: ["elk.domain.tld:5400"] ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"] ssl.certificate: "/etc/elk-certs/elk-ssl.crt" ssl.key: "/etc/elk-certs/elk-ssl.key" processors: - add_host_metadata: when.not.contains.tags: forwarded - add_cloud_metadata: ~ - add_docker_metadata: ~ - add_kubernetes_metadata: ~ logging.level: debug logging.selectors: ["*"]
В примере мониторится всего один файл (исключение очевидно лишнее)
Результат пересылается на удаленный хост в Logstash
Важно Сертификат должен соответствовать хостнейму - тут нельзя указать IP адрес
hosts: ["elk.domain.tld:5400"]
Тестирование
Запускаем Logstash: java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf
Проверяем, что Logstash запущен:
- netstat -nat |grep 11111
Если порт 11111 присутствует, значит Logstash готов принимать логи.
В новом терминальном окне пишем: echo "Logs are cool!" | nc localhost 11111
Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает.