Logstash: различия между версиями

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску
Строка 474: Строка 474:
   
 
=Пример=
 
=Пример=
  +
Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1
==nginx==
 
Отправка логов nginx в Elastic (один из вариантов) <BR>
 
Тут рассматривается самый простой случай - когда Filebeat не делает преобразований
 
Nginx умеет писать логи в Json
 
 
<PRE>
 
log_format nginxlog_json escape=json.
 
'{ "timestamp": "$time_iso8601", '
 
'"remote_addr": "$remote_addr", '
 
'"body_bytes_sent": $body_bytes_sent, '
 
'"request": "$request", '
 
'"request_method": "$request_method", '
 
'"request_time": $request_time, '
 
'"response_status": $status, '
 
'"upstream_status": $upstream_status,'
 
'"upstream_response_time": $upstream_response_time,'
 
'"upstream_connect_time": $upstream_connect_time,'
 
'"upstream_header_time": $upstream_header_time,'
 
'"upstream_addr": "$upstream_addr",'
 
'"host": "$host",'
 
'"http_x_forwarded_for": "$http_x_forwarded_for",'
 
'"http_referrer": "$http_referer", '
 
'"http_user_agent": "$http_user_agent", '
 
'"http_version": "$server_protocol", '
 
'"nginx_access": true }';
 
</PRE>
 
 
В лог попадает:
 
<PRE>
 
tail -1 /var/log/nginx/access.log.ssl | jq .
 
{
 
"timestamp": "2021-08-08T08:47:59+00:00",
 
"remote_addr": "159.224.49.4",
 
"body_bytes_sent": 361,
 
"request": "POST /internal/bsearch HTTP/1.1",
 
"request_method": "POST",
 
"request_time": 0.152,
 
"response_status": 200,
 
"upstream_status": 200,
 
"upstream_response_time": 0.028,
 
"upstream_connect_time": 0,
 
"upstream_header_time": 0.028,
 
"upstream_addr": "127.0.0.1:5601",
 
"host": "elk.domain.tld",
 
"http_x_forwarded_for": "",
 
"http_referrer": "https://elk.domain.tld/app/discover",
 
"http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15",
 
"http_version": "HTTP/1.1",
 
"nginx_access": true
 
}
 
</PRE>
 
 
==Filebeat==
 
<PRE>
 
filebeat.inputs:
 
- type: log
 
enabled: true
 
paths:
 
- /var/log/nginx/elk.domain.tld-access.log.ssl
 
exclude_files: ['\.gz$']
 
filebeat.config.modules:
 
reload.enabled: false
 
setup.template.settings:
 
index.number_of_shards: 1
 
setup.kibana:
 
output.logstash:
 
hosts: ["elk.domain.tld:5400"]
 
ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"]
 
ssl.certificate: "/etc/elk-certs/elk-ssl.crt"
 
ssl.key: "/etc/elk-certs/elk-ssl.key"
 
processors:
 
- add_host_metadata:
 
when.not.contains.tags: forwarded
 
- add_cloud_metadata: ~
 
- add_docker_metadata: ~
 
- add_kubernetes_metadata: ~
 
logging.level: debug
 
logging.selectors: ["*"]
 
</PRE>
 
В примере мониторится всего один файл (исключение очевидно лишнее)
 
<BR>
 
Результат пересылается на удаленный хост в Logstash
 
<B>Важно</B> Сертификат должен соответствовать хостнейму - тут нельзя указать IP адрес
 
<PRE>
 
hosts: ["elk.domain.tld:5400"]
 
</PRE>
 
 
==Logstash==
 
Минимальный конфиг - слушать Filebeat на порту 5400 (без сертификата - не будет работать)
 
<BR> Результат - писать в файл без всяких преобразований
 
 
<PRE>
 
input {
 
beats {
 
port => 5400
 
ssl => true
 
ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
 
ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
 
ssl_key => "/etc/elk-certs/elk-ssl.key"
 
ssl_verify_mode => "force_peer"
 
}
 
}
 
output {
 
file {
 
flush_interval => 5
 
gzip => false
 
path => "/var/log/logstash/logstash_debug.log"
 
}
 
}
 
</PRE>
 
 
Результат записи в файл не удовлетворительный
 
<PRE>
 
root@elk:/var/log/logstash# tail -1 logstash_debug.log | jq .
 
</PRE>
 
Сообщение не было разобрано на поля
 
<PRE>
 
{
 
"message": "{ \"timestamp\": \"2021-08-08T09:20:47+00:00\", \"remote_addr\": \"61.219.11.151\", \"body_bytes_sent\": 163, \"request\": \"GET / HTTP/1.1\", \"request_method\": \"GET\", \"request_time\": 0.213, \"response_status\": 400, \"upstream_status\": ,\"upstream_response_time\": ,\"upstream_connect_time\": ,\"upstream_header_time\": ,\"upstream_addr\": \"\",\"host\": \"elk.arturhaunt.ninja\",\"http_x_forwarded_for\": \"\",\"http_referrer\": \"\", \"http_user_agent\": \"\", \"http_version\": \"HTTP/1.1\", \"nginx_access\": true }",
 
"cloud": {
 
"availability_zone": "us-east-1f",
 
"region": "us-east-1",
 
"instance": {
 
"id": "i-075866580c26fd42c"
 
},
 
"service": {
 
"name": "EC2"
 
},
 
"machine": {
 
"type": "t3.large"
 
},
 
"image": {
 
"id": "ami-09e67e426f25ce0d7"
 
},
 
"account": {
 
"id": "543591064633"
 
},
 
"provider": "aws"
 
},
 
"@timestamp": "2021-08-08T09:20:52.684Z",
 
"agent": {
 
"hostname": "elk.domain.tld",
 
"id": "aef24fda-f14c-40cf-a388-fa0af443f5d7",
 
"type": "filebeat",
 
"version": "7.14.0",
 
"ephemeral_id": "c53f02a8-0b37-4dae-a3c7-cbf03eabca6a",
 
"name": "elk.arturhaunt.ninja"
 
},
 
"@version": "1",
 
"log": {
 
"offset": 4126,
 
"file": {
 
"path": "/var/log/nginx/elk.domain.tld-access.log.ssl"
 
}
 
},
 
"host": {
 
"hostname": "elk.domain.tld",
 
"id": "ec2385ad84aeea5eb425460c41a5b866",
 
"os": {
 
"type": "linux",
 
"codename": "focal",
 
"name": "Ubuntu",
 
"version": "20.04.2 LTS (Focal Fossa)",
 
"kernel": "5.4.0-1045-aws",
 
"platform": "ubuntu",
 
"family": "debian"
 
},
 
"ip": [
 
"172.31.91.220",
 
"fe80::14b5:eff:feac:ae79"
 
],
 
"name": "elk.domain.tld",
 
"mac": [
 
"16:b5:0e:ac:ae:79"
 
],
 
"architecture": "x86_64",
 
"containerized": false
 
},
 
"input": {
 
"type": "log"
 
},
 
"ecs": {
 
"version": "1.10.0"
 
},
 
"tags": [
 
"beats_input_codec_plain_applied"
 
]
 
}
 
</PRE>
 
   
 
=Тестирование=
 
=Тестирование=

Версия 12:25, 8 августа 2021

Logstash

Заметки по Logstash с примерами конфигов

input {
  ...
}
filter {
  ...
}
output {
  ...
}

INPUT

Данный метод является входной точкой для логов

Тыщи их если что

В описании только самые простые/ходовые

Beats

    beats {
        port => 5400
        ssl => true
        ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
        ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
        ssl_key => "/etc/elk-certs/elk-ssl.key"
        ssl_verify_mode => "force_peer"
    }

ssl_verify_modeedit

Value can be any of: none, peer, force_peer Default value is "none" By default the server doesn’t do any client verification.

peer will make the server ask the client to provide a certificate. If the client provides a certificate, it will be validated.

force_peer will make the server ask the client to provide a certificate. If the client doesn’t provide a certificate, the connection will be closed.

This option needs to be used with ssl_certificate_authorities and a defined list of CAs.

Генерация серетфикатов

#!/bin/bash

CN="logstash.tld"
sudo openssl req -subj "/CN=${CN}/" -x509 -days 3650 -batch -nodes -newkey rsa:2048 -keyout elk-ssl.key -out elk-ssl.crt

chown logstash elk-ssl.crt
chown logstash elk-ssl.key

Важно В конфигурации Filbeat нужно указат CN из серетфиката или будет ошибка SSL (что логично когда знаешь но не очевидно сразу)

File

input {

 file {
   type => "some_access_log"
   path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]
   exclude => [ "*.gz", "*.zip", "*.rar" ]
   start_position => "end"
   stat_interval => 1
   discover_interval => 30
 }

}

type => "some_access_log"
тип/описание лога. При использовании нескольких входных блоков, удобно их разделять для последующих действий в filter или output.
path => [ "/var/log/vs01/*.log", "/var/log/vs02/*.log" ]

указывается путь к лог-файлам, которые подлежат обработке. Путь должен быть абсолютным

exclude => [ "*.gz", "*.zip", "*.rar" ]
исключает из обработки файлы с соответствующими расширениями.
start_position => "end"
ждёт появления новых сообщений в конце файла. При обработки уже имеющихся логов, можно выставить «beginning», тогда обработка логов будет происходить построчно с начала файлов.
stat_interval => 1
как часто (в секундах) проверять файлы на изменения. При больших значения, уменьшится частота системных вызовов, но так же увеличится время чтения новых строк.
discover_interval => 30
время (в секундах) через которое будет обновлён список обрабатываемых файлов указанных в path.

tcp

Пример конфигурации, для работы с логами удалённых сервисов:

input {
  tcp {
    type => "webserver_prod"
    data_timeout => 10
    mode => "server"
    host => "192.168.3.12"
    port => 3337
  }
}

Построчное описание настроек:

type => "webserver_prod"
тип/описание лога.
data_timeout => 10
<PRE>
время (в секундах), по истечении которого не активное tcp соединение будет закрыто. Значение -1 — соединение всегда будет открыто.
<PRE>
mode => "server"
host => "192.168.3.12"
port => 3337

в этом случае Logstash становится сервером, и начинает слушать на 192.168.3.12:3337. При установке mode => «client» Logstash будет присоединятся к удалённому ip:port для забора логов.

udp

Для udp настройки аналогичные tcp:
input {
  udp {
    type => "webserver_prod"
    buffer_size => 4096
    host => "192.168.3.12"
    port => 3337
  }
}

FILTER

В данном блоке настраиваются основные манипуляции с логами. Это может быть и разбивка по key=value, и удаление ненужных параметров, и замена имеющихся значений, и использование geoip или DNS запросов для ип-адресов или названий хостов.

На первый взгляд применение фильтров может показаться сложным и нелогичным, но это не совсем так.

grok

Пример конфигурационного файла для основной нормализации логов:

filter {
  grok {
    type => "some_access_log"
    patterns_dir => "/path/to/patterns/"
    pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
  }
}

Построчное описание настроек:

type => "apache_access"

тип/описание лога. Здесь надо указать тот тип (type), который прописан в блоке input для которого будет происходить обработка.

patterns_dir => "/path/to/patterns/"

путь к каталогу, содержащим шаблоны обработки логов. Все файлы находящиеся в указанной папке будут загружены Logstash, так что лишние файлы там не желательны.

pattern => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

указывается шаблон разборки логов. Шаблон можно использовать либо в конфигурационном файле, либо из файла шаблонов. Что бы не путаться, я для каждого формата логов создаю отдельный шаблонный файл.

Подробнее про шаблоны

С помощью grok фильтра можно структурировать большую часть логов — syslog, apache, nginx, mysql итд, записанных в определённом формате. Logstash имеет более 120 шаблонов готовых регулярных выражений (regex). Так что написание фильтров для обработки большинства логов не должно вызвать особого страха или недопонимания.

Формат шаблонов относительно простой — NAME PATTERN, то есть построчно указывается имя шаблона и ему соответствующее регулярное выражение. Пример:

NUMBER \d+
WORD \b\w+\b
USERID [a-zA-Z0-9_-]+

Можно использовать любой ранее созданный шаблон:

USER %{USERID}

Шаблоны можно так же и комбинировать:

CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
MAC (?:%{CISCOMAC}|%{WINDOWSMAC})

Допустим формат логов у нас следующий:

55.3.244.1 GET /index.html 15824 0.043


С данным примером лога достаточно pattern записать в виде

"%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"

После обработки наша строка будет выглядеть следующим образом:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

mutate

Пример конфигурационного файла для изменения/удаления записей из логов: filter {

 mutate {
   type => "apache_access"
   remove => [ "client" ]
   rename => [ "HOSTORIP", "client_ip" ]
   gsub => [ "message", "\\/", "_" ]
   add_field => [ "sample1", "from %{clientip}" ]
 }

}

Построчное описание настроек: type => "apache_access" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.

remove => [ "client" ] удаление всех данных имеющих название поля client. Возможно указание нескольких названий полей.

rename => [ "HOSTORIP", "client_ip" ] переименование название поля HOSTORIP в client_ip.

gsub => [ "message", "\\/", "_" ] замена всех "/" на "_" в поле messages.

add_field => [ "sample1", "from %{clientip}" ] добавление нового поля «sample1» со значением «from %{clientip}». Допускается использование названий переменных.

2.3 date Пример конфигурационого файла: filter {

 date {
   type => "apache_access"
   match => [ "timestamp", "MMM dd HH:mm:ss" ]
 }

}

Построчное описание настроек: type => "apache_access" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.

match => [ "timestamp", "MMM dd HH:mm:ss" ] временная метка события. Важная настройка для дальнейшей возможности сортировки или выборки логов. Если в логах время указано в unix timestamp (squid), то следует использовать match => [ «timestamp», «UNIX» ]

kv

Пример конфигурационного файла для обработки логов в формате key=value: filter {

 kv {
   type => "custom_log"
   value_split => "=:"
   fields => ["reminder"]
   field_split => "\t?&"
 }

}

Построчное описание настроек: type => "custom_log" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.

value_split => "=:" использовать символы "=" и ":" для разделения ключа-значения.

fields => ["reminder"] название поля в котором искать 'ключ=значение'. По умолчанию разбивка будет происходить для всей строки лога.

field_split => "\t?&" использовать символы "\t?&" для разделения ключей. \t — знак табулятора

multiline

Пример конфигурационного файла для «склеивания» многострочных логов (например Java stack trace): filter {

 multiline {
   type => "java_log"
   pattern => "^\s"
   what => "previous"
 }

}

Построчное описание настроек: type => "java_log" тип/описание лога. Указывается тип (type) логов с которыми будет происходить обработка.

pattern => "^\s" регулярное выражение

what => "previous" при соответствии шаблону «pattern» строка принадлежит предыдущей (previous) строке.

OUTPUT

Название этого блока/метода говорит само за себя — в нём указываются настройки для исходящих сообщений. Аналогично предыдущим блокам, здесь можно указывать любое количество исходящих подблоков.

stdout

Пример конфигурационного файла для вывода логов в standard output:

output {
  stdout {
    type => "custom_log"
    message => "IP - %{clientip}. Full message: %{@message}. End of line."
  }
}
type => "custom_log"

тип/описание лога.

message => "clIP - %{clientip}. Full message: %{@message}. End of line."

указывается формат исходящего сообщения. Допустимо использование переменных после grok-фильтрации.

file

Пример конфигурационого файла для записи логов в файл:

output {
  file {
    type => "custom_log"
    flush_interval => 5
    gzip=> true
    path => "/var/log/custom/%{clientip}/%{type}"
    message_format => "ip: %{clientip} request:%{requri}"
  }
}
type => "custom_log"

тип/описание лога.

flush_interval => 5

интервал записи исходящих сообщений. Значение 0 будет записывать каждое сообщение.

gzip=> true

файл исходящих сообщений будет сжат Gzip.

path => "/var/log/custom/%{clientip}/%{type}"

путь и название файла куда будут сохраняться исходящие сообщения. Можно использовать переменные. В данном примере, для каждого уникального IP адреса будет создана своя папка и сообщения будут записываться в файл соответствующий переменной %{type}.

message_format => "ip: %{clientip} request:%{requri}"

формат исходящего сообщения.

elasticsearch

Пример конфигурационного файла для записи логов в базу Elasticsearch:

output {
  elasticsearch {
    type => "custom_log"
    cluster => "es_logs"
    embedded => false
    host => "192.168.1.1"
    port => "19300"
    index => "logs-%{+YYYY.MM.dd}" 
  }
}
type => "custom_log"

тип/описание лога.

cluster => "es_logs"

название кластера указанного в cluster.name в настроечном файле Elasticsearch.

embedded => false

указывает какую базу Elasticsearch использовать внутреннюю или стороннюю.

port => "19300"

транспортный port Elasticsearch.

host => "192.168.1.1"

IP адрес Elasticsearch

index => "logs-%{+YYYY.MM.dd}" 

название индекса куда будут записываться логи.

email

Данный плагин можно использовать для алертов.

Пример конфигурационого файла:

output {
  email {
    type => "custom_log"
    from => "logstash@domain.com"
    to => "admin1@domain.com"
    cc => "admin2@domain.com"
    subject => "Found '%{matchName}' Alert on %{@source_host}"
    body => "Here is the event line %{@message}"
    htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"
    via => "sendmail"
    options => [ "smtpIporHost", "smtp.gmail.com",
                          "port", "587",
                          "domain", "yourDomain",
                          "userName", "yourSMTPUsername",
                          "password", "PASS",
                          "starttls", "true",
                          "authenticationType", "plain",
                          "debug", "true"
                         ]
    match => [ "response errors", "response,501,,or,response,301",
                        "multiple response errors", "response,501,,and,response,301" ]

  }
}
type => "custom_log"

тип/описание лога.

from => "logstash@domain.com"
to => "admin1@domain.com"
cc => "admin2@domain.com"
subject => "Found '%{matchName}' Alert on %{@source_host}"
тема письма уведомления. Можно использовать переменные. Например %{matchName} — название условия совпадения из настройки «match».
body => "Here is the event line %{@message}"
htmlbody => "<h2>%{matchName}</h2><br/><br/><h3>Full Event</h3><br/><br/><div align='center'>%{@message}</div>"

тело письма.

via => "sendmail"

способ отсылки письма. Возможен один вариант из двух — smtp или sendmail.

options => ...

стандартные настройки почтовых параметров.

match => [ "response errors", "response,501,,or,response,301",
                    "multiple response errors", "response,501,,and,response,301" ]

«response errors» — название алерта (записывается в переменную %{matchName}). «response,501,,or,response,301» — критерии срабатывания алертов. В данном примере если поле response содержит значение 501 или 301, то алерт считается

сработавшим. Во второй строке используется логика AND, т.е. оба условия должны быть выполнены.

Пример

Перенесен сюда -> https://noname.com.ua/mediawiki/index.php/LogstashExample1

Тестирование

Запускаем Logstash: java -jar logstash-1.1.9-monolithic.jar agent -f ./habr.conf

Проверяем, что Logstash запущен:

  1. netstat -nat |grep 11111

Если порт 11111 присутствует, значит Logstash готов принимать логи.

В новом терминальном окне пишем: echo "Logs are cool!" | nc localhost 11111

Смотрим результат в окне где запущен Logstash. Если там появилось секретное послание, значит всё работает.

Ссылки