Logstash: различия между версиями
Sirmax (обсуждение | вклад) |
Sirmax (обсуждение | вклад) (→Пример) |
||
| Строка 474: | Строка 474: | ||
=Пример= |
=Пример= |
||
| + | Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1 |
||
| − | ==nginx== |
||
| − | Отправка логов nginx в Elastic (один из вариантов) <BR> |
||
| − | Тут рассматривается самый простой случай - когда Filebeat не делает преобразований |
||
| − | Nginx умеет писать логи в Json |
||
| − | |||
| − | <PRE> |
||
| − | log_format nginxlog_json escape=json. |
||
| − | '{ "timestamp": "$time_iso8601", ' |
||
| − | '"remote_addr": "$remote_addr", ' |
||
| − | '"body_bytes_sent": $body_bytes_sent, ' |
||
| − | '"request": "$request", ' |
||
| − | '"request_method": "$request_method", ' |
||
| − | '"request_time": $request_time, ' |
||
| − | '"response_status": $status, ' |
||
| − | '"upstream_status": $upstream_status,' |
||
| − | '"upstream_response_time": $upstream_response_time,' |
||
| − | '"upstream_connect_time": $upstream_connect_time,' |
||
| − | '"upstream_header_time": $upstream_header_time,' |
||
| − | '"upstream_addr": "$upstream_addr",' |
||
| − | '"host": "$host",' |
||
| − | '"http_x_forwarded_for": "$http_x_forwarded_for",' |
||
| − | '"http_referrer": "$http_referer", ' |
||
| − | '"http_user_agent": "$http_user_agent", ' |
||
| − | '"http_version": "$server_protocol", ' |
||
| − | '"nginx_access": true }'; |
||
| − | </PRE> |
||
| − | |||
| − | В лог попадает: |
||
| − | <PRE> |
||
| − | tail -1 /var/log/nginx/access.log.ssl | jq . |
||
| − | { |
||
| − | "timestamp": "2021-08-08T08:47:59+00:00", |
||
| − | "remote_addr": "159.224.49.4", |
||
| − | "body_bytes_sent": 361, |
||
| − | "request": "POST /internal/bsearch HTTP/1.1", |
||
| − | "request_method": "POST", |
||
| − | "request_time": 0.152, |
||
| − | "response_status": 200, |
||
| − | "upstream_status": 200, |
||
| − | "upstream_response_time": 0.028, |
||
| − | "upstream_connect_time": 0, |
||
| − | "upstream_header_time": 0.028, |
||
| − | "upstream_addr": "127.0.0.1:5601", |
||
| − | "host": "elk.domain.tld", |
||
| − | "http_x_forwarded_for": "", |
||
| − | "http_referrer": "https://elk.domain.tld/app/discover", |
||
| − | "http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15", |
||
| − | "http_version": "HTTP/1.1", |
||
| − | "nginx_access": true |
||
| − | } |
||
| − | </PRE> |
||
| − | |||
| − | ==Filebeat== |
||
| − | <PRE> |
||
| − | filebeat.inputs: |
||
| − | - type: log |
||
| − | enabled: true |
||
| − | paths: |
||
| − | - /var/log/nginx/elk.domain.tld-access.log.ssl |
||
| − | exclude_files: ['\.gz$'] |
||
| − | filebeat.config.modules: |
||
| − | reload.enabled: false |
||
| − | setup.template.settings: |
||
| − | index.number_of_shards: 1 |
||
| − | setup.kibana: |
||
| − | output.logstash: |
||
| − | hosts: ["elk.domain.tld:5400"] |
||
| − | ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"] |
||
| − | ssl.certificate: "/etc/elk-certs/elk-ssl.crt" |
||
| − | ssl.key: "/etc/elk-certs/elk-ssl.key" |
||
| − | processors: |
||
| − | - add_host_metadata: |
||
| − | when.not.contains.tags: forwarded |
||
| − | - add_cloud_metadata: ~ |
||
| − | - add_docker_metadata: ~ |
||
| − | - add_kubernetes_metadata: ~ |
||
| − | logging.level: debug |
||
| − | logging.selectors: ["*"] |
||
| − | </PRE> |
||
| − | В примере мониторится всего один файл (исключение очевидно лишнее) |
||
| − | <BR> |
||
| − | Результат пересылается на удаленный хост в Logstash |
||
| − | <B>Важно</B> Сертификат должен соответствовать хостнейму - тут нельзя указать IP адрес |
||
| − | <PRE> |
||
| − | hosts: ["elk.domain.tld:5400"] |
||
| − | </PRE> |
||
| − | |||
| − | ==Logstash== |
||
| − | Минимальный конфиг - слушать Filebeat на порту 5400 (без сертификата - не будет работать) |
||
| − | <BR> Результат - писать в файл без всяких преобразований |
||
| − | |||
| − | <PRE> |
||
| − | input { |
||
| − | beats { |
||
| − | port => 5400 |
||
| − | ssl => true |
||
| − | ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"] |
||
| − | ssl_certificate => "/etc/elk-certs/elk-ssl.crt" |
||
| − | ssl_key => "/etc/elk-certs/elk-ssl.key" |
||
| − | ssl_verify_mode => "force_peer" |
||
| − | } |
||
| − | } |
||
| − | output { |
||
| − | file { |
||
| − | flush_interval => 5 |
||
| − | gzip => false |
||
| − | path => "/var/log/logstash/logstash_debug.log" |
||
| − | } |
||
| − | } |
||
| − | </PRE> |
||
| − | |||
| − | Результат записи в файл не удовлетворительный |
||
| − | <PRE> |
||
| − | root@elk:/var/log/logstash# tail -1 logstash_debug.log | jq . |
||
| − | </PRE> |
||
| − | Сообщение не было разобрано на поля |
||
| − | <PRE> |
||
| − | { |
||
| − | "message": "{ \"timestamp\": \"2021-08-08T09:20:47+00:00\", \"remote_addr\": \"61.219.11.151\", \"body_bytes_sent\": 163, \"request\": \"GET / HTTP/1.1\", \"request_method\": \"GET\", \"request_time\": 0.213, \"response_status\": 400, \"upstream_status\": ,\"upstream_response_time\": ,\"upstream_connect_time\": ,\"upstream_header_time\": ,\"upstream_addr\": \"\",\"host\": \"elk.arturhaunt.ninja\",\"http_x_forwarded_for\": \"\",\"http_referrer\": \"\", \"http_user_agent\": \"\", \"http_version\": \"HTTP/1.1\", \"nginx_access\": true }", |
||
| − | "cloud": { |
||
| − | "availability_zone": "us-east-1f", |
||
| − | "region": "us-east-1", |
||
| − | "instance": { |
||
| − | "id": "i-075866580c26fd42c" |
||
| − | }, |
||
| − | "service": { |
||
| − | "name": "EC2" |
||
| − | }, |
||
| − | "machine": { |
||
| − | "type": "t3.large" |
||
| − | }, |
||
| − | "image": { |
||
| − | "id": "ami-09e67e426f25ce0d7" |
||
| − | }, |
||
| − | "account": { |
||
| − | "id": "543591064633" |
||
| − | }, |
||
| − | "provider": "aws" |
||
| − | }, |
||
| − | "@timestamp": "2021-08-08T09:20:52.684Z", |
||
| − | "agent": { |
||
| − | "hostname": "elk.domain.tld", |
||
| − | "id": "aef24fda-f14c-40cf-a388-fa0af443f5d7", |
||
| − | "type": "filebeat", |
||
| − | "version": "7.14.0", |
||
| − | "ephemeral_id": "c53f02a8-0b37-4dae-a3c7-cbf03eabca6a", |
||
| − | "name": "elk.arturhaunt.ninja" |
||
| − | }, |
||
| − | "@version": "1", |
||
| − | "log": { |
||
| − | "offset": 4126, |
||
| − | "file": { |
||
| − | "path": "/var/log/nginx/elk.domain.tld-access.log.ssl" |
||
| − | } |
||
| − | }, |
||
| − | "host": { |
||
| − | "hostname": "elk.domain.tld", |
||
| − | "id": "ec2385ad84aeea5eb425460c41a5b866", |
||
| − | "os": { |
||
| − | "type": "linux", |
||
| − | "codename": "focal", |
||
| − | "name": "Ubuntu", |
||
| − | "version": "20.04.2 LTS (Focal Fossa)", |
||
| − | "kernel": "5.4.0-1045-aws", |
||
| − | "platform": "ubuntu", |
||
| − | "family": "debian" |
||
| − | }, |
||
| − | "ip": [ |
||
| − | "172.31.91.220", |
||
| − | "fe80::14b5:eff:feac:ae79" |
||
| − | ], |
||
| − | "name": "elk.domain.tld", |
||
| − | "mac": [ |
||
| − | "16:b5:0e:ac:ae:79" |
||
| − | ], |
||
| − | "architecture": "x86_64", |
||
| − | "containerized": false |
||
| − | }, |
||
| − | "input": { |
||
| − | "type": "log" |
||
| − | }, |
||
| − | "ecs": { |
||
| − | "version": "1.10.0" |
||
| − | }, |
||
| − | "tags": [ |
||
| − | "beats_input_codec_plain_applied" |
||
| − | ] |
||
| − | } |
||
| − | </PRE> |
||
=Тестирование= |
=Тестирование= |
||
Версия 11:25, 8 августа 2021
Logstash
Заметки по Logstash с примерами конфигов
input {
...
}
filter {
...
}
output {
...
}
INPUT
Данный метод является входной точкой для логов
Тыщи их если что
В описании только самые простые/ходовые
Beats
beats {
port => 5400
ssl => true
ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
ssl_key => "/etc/elk-certs/elk-ssl.key"
ssl_verify_mode => "force_peer"
}
ssl_verify_modeedit
Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification.
peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.
force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.
This option needs to be used with ssl_certificate_authorities and a defined list of CAs.
Генерация серетфикатов
#!/bin/bash
CN="logstash.tld"
sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt
chown logstash elk-ssl.crt
chown logstash elk-ssl.key
Важно В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)
File
input {
file {
type => "some_access_log"
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
exclude => [ "*.gz", "*.zip", "*.rar" ]
start_position => "end"
stat_interval => 1
discover_interval => 30
}
}
type => "some_access_log" тип/описание лога. При использовании нескольких входных блоков, удобно их разделять для последующих действий в filter или output.
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным
exclude => [ "*.gz", "*.zip", "*.rar" ] исключает из обработки файлы с соответствующими расширениями.
start_position => "end" ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1 как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30 время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.
tcp
Пример конфигурации, для работы с логами удалённых сервисов:
input {
tcp {
type => "webserver_prod"
data_timeout => 10
mode => "server"
host => "192.168.3.12"
port => 3337
}
}
Построчное описание настроек:
type => "webserver_prod" тип/описание лога.
data_timeout => 10 <PRE> время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто. <PRE> mode => "server" host => "192.168.3.12" port => 3337
в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.
udp
Для udp настройки аналогичные tcp:
input {
udp {
type => "webserver_prod"
buffer_size => 4096
host => "192.168.3.12"
port => 3337
}
}
FILTER
В данном блоке настраиваются основные манипуляции с логами. Это может быть и разбивка по key=value, и удаление ненужных параметров, и замена имеющихся значений, и использование geoip или DNS запросов для ип-адресов или названий хостов.
На первый взгляд применение фильтров может показаться сложным и нелогичным, но это не совсем так.
grok
Пример конфигурационного файла для основной нормализации логов:
filter {
grok {
type => "some_access_log"
patterns_dir => "/path/to/patterns/"
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
}
}
Построчное описание настроек:
type => "apache_access"
тип/описание лога. Здесь надо указать тот тип (type), который прописан в блоке input для которого будет происходить обработка.
patterns_dir => "/path/to/patterns/"
путь к каталогу, содержащим шаблоны обработки логов. Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.
pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
указывается шаблон разборки логов. Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. Что бы не путаться, я для каждого формата логов создаю отдельный шаблонный файл.
Подробнее про шаблоны
С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате. Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.
Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:
NUMBER \d+ WORD \b\w+\b USERID [a-zA-Z0-9_-]+
Можно использовать любой ранее созданный шаблон:
USER %{USERID}
Шаблоны можно так же и комбинировать:
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
MAC (?:%{CISCOMAC}|%{WINDOWSMAC})
Допустим формат логов у нас следующий:
55.3.244.1 GET /index.html 15824 0.043
С данным примером лога достаточно pattern записать в виде
"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
После обработки наша строка будет выглядеть следующим образом:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
mutate
Пример конфигурационного файла для изменения/удаления записей из логов: filter {
mutate {
type => "apache_access"
remove => [ "client" ]
rename => [ "HOSTORIP", "client_ip" ]
gsub => [ "message", "\\/", "_" ]
add_field => [ "sample1", "from %{clientip}" ]
}
}
Построчное описание настроек: type => "apache_access" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
remove => [ "client" ] удаление всех данных имеющих название поля client. Возможно указание нескольких названий полей.
rename => [ "HOSTORIP", "client_ip" ] переименование название поля HOSTORIP в client_ip.
gsub => [ "message", "\\/", "_" ] замена всех "/" на "_" в поле messages.
add_field => [ "sample1", "from %{clientip}" ] добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных.
2.3 date Пример конфигурационого файла: filter {
date {
type => "apache_access"
match => [ "timestamp", "MMM dd HH:mm:ss" ]
}
}
Построчное описание настроек: type => "apache_access" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
match => [ "timestamp", "MMM dd HH:mm:ss" ] временная метка события. Важная настройка для дальнейшей возможности сортировки или выборки логов. Если в логах время указано в unix timestamp (squid), то следует использовать match => [ «timestamp», «UNIX» ]
kv
Пример конфигурационного файла для обработки логов в формате key=value: filter {
kv {
type => "custom_log"
value_split => "=:"
fields => ["reminder"]
field_split => "\t?&"
}
}
Построчное описание настроек: type => "custom_log" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
value_split => "=:" использовать символы "=" и ":" для разделения ключа-значения.
fields => ["reminder"] название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.
field_split => "\t?&" использовать символы "\t?&" для разделения ключей. \t — знак табулятора
multiline
Пример конфигурационного файла для «склеивания» многострочных логов (например Java stack trace): filter {
multiline {
type => "java_log"
pattern => "^\s"
what => "previous"
}
}
Построчное описание настроек: type => "java_log" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.
pattern => "^\s" регулярное выражение
what => "previous" при соответствии шаблону «pattern» строка принадлежит предыдущей (previous) строке.
OUTPUT
Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков.
stdout
Пример конфигурационного файла для вывода логов в standard output:
output {
stdout {
type => "custom_log"
message => "IP - %{clientip}. Full message: %{@message}. End of line."
}
}
type => "custom_log"
тип/описание лога.
message => "clIP - %{clientip}. Full message: %{@message}. End of line."
указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.
file
Пример конфигурационого файла для записи логов в файл:
output {
file {
type => "custom_log"
flush_interval => 5
gzip=> true
path => "/var/log/custom/%{clientip}/%{type}"
message_format => "ip: %{clientip} request:%{requri}"
}
}
type => "custom_log"
тип/описание лога.
flush_interval => 5
интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение.
gzip=> true
файл исходящих сообщений будет сжат Gzip.
path => "/var/log/custom/%{clientip}/%{type}"
путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка и сообщения будут записываться в файл соответствующий переменной %{type}.
message_format => "ip: %{clientip} request:%{requri}"
формат исходящего сообщения.
elasticsearch
Пример конфигурационного файла для записи логов в базу Elasticsearch:
output {
elasticsearch {
type => "custom_log"
cluster => "es_logs"
embedded => false
host => "192.168.1.1"
port => "19300"
index => "logs-%{+YYYY.MM.dd}"
}
}
type => "custom_log"
тип/описание лога.
cluster => "es_logs"
название кластера указанного в cluster.name в настроечном файле Elasticsearch.
embedded => false
указывает какую базу Elasticsearch использовать внутреннюю или стороннюю.
port => "19300"
транспортный port Elasticsearch.
host => "192.168.1.1"
IP адрес Elasticsearch
index => "logs-%{+YYYY.MM.dd}"
название индекса куда будут записываться логи.
Данный плагин можно использовать для алертов.
Пример конфигурационого файла:
output {
email {
type => "custom_log"
from => "logstash@domain.com"
to => "admin1@domain.com"
cc => "admin2@domain.com"
subject => "Found '%{matchName}' Alert on %{@source_host}"
body => "Here is the event line %{@message}"
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
via => "sendmail"
options => [ "smtpIporHost", "smtp.gmail.com",
"port", "587",
"domain", "yourDomain",
"userName", "yourSMTPUsername",
"password", "PASS",
"starttls", "true",
"authenticationType", "plain",
"debug", "true"
]
match => [ "response errors", "response,501,,or,response,301",
"multiple response errors", "response,501,,and,response,301" ]
}
}
type => "custom_log"
тип/описание лога.
from => "logstash@domain.com" to => "admin1@domain.com" cc => "admin2@domain.com"
subject => "Found '%{matchName}' Alert on %{@source_host}"
тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match».
body => "Here is the event line %{@message}"
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
тело письма.
via => "sendmail"
способ отсылки письма. Возможен один вариант из двух — smtp или sendmail.
options => ...
стандартные настройки почтовых параметров.
match => [ "response errors", "response,501,,or,response,301",
"multiple response errors", "response,501,,and,response,301" ]
«response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается
сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены.
Пример
Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1
Тестирование
Запускаем Logstash: java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf
Проверяем, что Logstash запущен:
- netstat -nat |grep 11111
Если порт 11111 присутствует, значит Logstash готов принимать логи.
В новом терминальном окне пишем: echo "Logs are cool!" | nc localhost 11111
Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает.