LogstashExample1
Пример
Это дополнение статьи https://noname.com.ua/mediawiki/index.php?title=Logstash
Задача
Задача - отправка логов от тестового приложения в Logstash
- Для тестовых устновок - писать максимально подробные логи приложения
- Nginx - настолько подробно насколько это возможно
- Логи которые пишет бекенд (Ruby)
- Для Staging/Prod - менее подробные логи (что бы не перегрузить Elasticsearch)
Реализация
- Сервер для сбора - Elasticsearch, Logstash, Kibana, Curator
- Elasticsearch - индексация логов и поиск
- Logstash - прием логов и форматирование перед отравкой в Elasticsearch
- Kibana - интерфейс к Elasticsearch
- Curator -удаление старых индексов
- На окружениях которые генерируют логи - Filebeat (Filebeat проще и легковеснее - нет нужды ставить везде Logstash)
При реализации НЕ учитывалось (из экономических соображений):
- Отказоустойчивость
- Standalone ElasticSerach (1 нода)
- Один экзкмпляр Logstash
- Балансировка нагрузки отсутвует
- Нет менеджера очередей для логов для сглаживания пиков нагрузки (можно использовать Kafka/RabbitMQ)
Итого есть три типа логов которы нужно собирать (и они в разных форматах)
- nginx access.log
- nginx error.log
- ruby applocation log
Конфигурация источников логов
Конфигурация nginx
- Из-за размера вынесено в отдельную заметку: LogstashExample-nginx-config
Результат
tail -1 access.log | jq .
Пример лога (не все поля)
{
...
"nginx_http_user_agent": "python-requests/2.23.0",
"nginx_request_headers": " connection='keep-alive' accept='*/*' accept-encoding='gzip, deflate' host='login-anastasiia-env.arturhaunt.com' user-agent='python-requests/2.23.0' ",
"nginx_time_iso8601": "2021-08-10T03:09:57+00:00",
"nginx_time_local": "10/Aug/2021:03:09:57 +0000",
"nginx_response_body": "<html>\r\n<head><title>301 Moved Permanently</title></head>\r\n<body>\r\n<center><h1>301 Moved Permanently</h1></center>\r\n<hr><center>openresty/1.19.3.2</center>\r\n</body>\r\n</html>\r\n",
...
}
Конфигурация приложения
Формат логов приложения в рамках задачи не могут быть изменены
Конфигурация доставки логов
Для доставки логов всех типов используется Filebeat
Общая часть конфигурации Filebeat
Это часть конфигурации которая не относится к какому-то определнному логу а общая для всех.
Модули
Оставега конфигурация по-умолчанию.
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings: index.number_of_shards: 1 setup.kibana:
Output
Отправлять логи в Logstash (на beat input с авторизацией по сертификатам)
Подробнее про настройку: https://noname.com.ua/mediawiki/index.php/Logstash
output.logstash: hosts: ["elk.arturhaunt.ninja:5400"] ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"] ssl.certificate: "/etc/elk-certs/elk-ssl.crt" ssl.key: "/etc/elk-certs/elk-ssl.key"
Процессинг по-умолчанию добавляет метаданные хоста-источника
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
logging.level: info
logging.selectors: ["*"]
Пересылка nginx access logs
Предельно простая конфигурация - читать файлы по маске
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/www/deploy/backend/current/log/nginx/*access.log*
exclude_files: ['\.gz$']
fields:
nginx_logs: "true"
ruby_application_logs: "false"
type: "nginx_access_log_json"
source_hostname: "test-env.domain.tld"
fields_under_root: true
Пересылка nginx error logs
Ничем не отличается от access.log
Разбор лога на отдельные поля происходит на стороне Logstash.
- type: log
enabled: true
paths:
- /var/www/deploy/backend/current/log/nginx/*error.log*
exclude_files: ['\.gz$']
fields:
nginx_logs: "true"
ruby_application_logs: "false"
type: "nginx_error_log"
source_hostname: "some-domain.tld"
fields_under_root: true
Пересылка ruby application logs
Немного более сложная сложная конфигурация для многострочного лога
paths:
- /var/www/deploy/backend/shared/log/staging.log
exclude_files: ['\.gz$']
fields:
nginx_logs: "false"
ruby_application_logs: "true"
type: "ruby_application_logs"
source_hostname: "some-domain.tld"
fields_under_root: true
multiline:
pattern: '[A-Z], \[[0-9]{4}'
negate: true
match: after
Пример лога
D, [2021-08-10T11:17:02.270613 #7] DEBUG -- [ContentSelector::FindExternalService]: /usr/src/app/app/services/content_selector/find_external_service.rb:31:in `perform' ^M/usr/src/app/app/services/service.rb:188:in `call!' ^M/usr/src/app/app/services/service.rb:204:in `call' ^M/usr/src/app/app/services/service.rb:110:in `perform' ^M/usr/src/app/lib/activity_organizers/external.rb:32:in `block in vacant_content_and_topic' ^M/usr/src/app/lib/activity_organizers/external.rb:31:in `each' ^M/usr/src/app/lib/activity_organizers/external.rb:31:in `inject' ^M/usr/src/app/lib/activity_organizers/external.rb:31:in `vacant_content_and_topic' ^M/usr/src/app/lib/activity_organizers/external.rb:18:in `perform'
Разделитель(читается : одна большая буква, запятая, пробел, символ "[", 4 цифры):
pattern: '[A-Z], \[[0-9]{4}'
TODO: Скорее всего список A-Z избыточен так как первая буква означает уровень логгирования, для года наверно можно использовать 202[1-9]
- Подробнее про разбор многострочных сообщений - FilebeatMultiline
- Оригинальная документация - https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html (но не слишком понятная для меня)
- TL;DR: использовать negate: true и match: after практически во всех случаях
Logstash
Input
Получение логов на порту 5400 (Это не syslog протокол - отправляет Filebeat, хотя получене логов через сислог тоже возможно)
input {
beats {
port => 5400
ssl => true
ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
ssl_key => "/etc/elk-certs/elk-ssl.key"
ssl_verify_mode => "force_peer"
}
}
- В настройке собственно ничего нет кроме путей к сертификатам и номера порта.
- Все логи доставляются через один Input
Filter
Общая для всех логов часть
filter {
mutate {
add_field => { "nginx" => "nginx_object" }
add_field => { "ruby" => "ruby_object" }
}
mutate {
rename => { "nginx" => "[application_data][nginx][object]" }
rename => { "ruby" => "[application_data][ruby][object]" }
}
...
пропущена часть специфичная для каждого из типов логов
...
}
Построчный разбор
Добавление полей
Добавить в лог 2 новых поля
Этот шаг нужен для того что бы потом добавлять в эти поля соответствующие логи
mutate {
add_field => { "nginx" => "nginx_object" }
add_field => { "ruby" => "ruby_object" }
}
Переименование полей
Переименовать поля. По какой-то причине создать сразу поля с вложениями не получается
mutate {
rename => { "nginx" => "[application_data][nginx][object]" }
rename => { "ruby" => "[application_data][ruby][object]" }
}
В результате получим
{
"input": {
"type": "log"
"application_data": {
"ruby": {
"object": "ruby_object"
},
"nginx": {
...
Глобально это сделано для того что бы логи от бекенда и от nginx отличались на уровне application_data а поля верхнего уровня совпадали (но я до сих пор не уверен имеет ли это смысл или может быть лучше писать их в разные индексы)
Filter для nginx access logs
Часть фильтра не относящаяся к nginx access log описана в другом разделе
Конфигурация
filter {
...
пропущнена часть общая для всех логов
...
if ([type] == "nginx_access_log_json") {
mutate {
add_field => { "log_level" => "INFO" }
}
json {
source => "message"
target => "[application_data][nginx]"
}
mutate {
remove_field => [ "message" ]
}
json {
source => "[application_data][nginx][nginx_request_body]"
target => "[application_data][nginx][nginx_request_body_json]"
tag_on_failure => "request_body_is_not_json"
}
json {
source => "[application_data][nginx][nginx_response_body]"
target => "[application_data][nginx][nginx_response_body_json]"
tag_on_failure => "response_body_is_not_json"
}
kv {
source => "[application_data][nginx][nginx_request_headers]"
target => "[application_data][nginx][nginx_request_headers]"
}
date {
match => [
"[application_data][nginx][nginx_time_iso8601]",
"MMM dd yyyy HH:mm:ss",
"MMM d yyyy HH:mm:ss",
"ISO8601"
]
target => "@application_timestamp"
}
}
...
Преобразования специфичные для access.log
Условие надожения фильтров - далее специфичные для access logs фильтры.
if ([type] == "nginx_access_log_json") {
Добавление поля с уровнем сообщения
Модификация лога - для всех сообщений из access.log добавляем уровень INFO
TODO(?) Выставлять другой уровень в зависмости от кода ответа?
mutate {
add_field => { "log_level" => "INFO" }
}
Разбор сообщения (т.е. поля "message")
Этот фильтр разбирает строку как JSON.
json {
source => "message"
target => "[application_data][nginx]"
}
например если Filebeat пришлет строку лога (экранированный JSON)
'{ \"key\": \"value\"} '
то без модификации в логе будет
{
"message": '{ \"key\": \"value\"} '
}
т.е. в поле message сообщение попадет "как есть"
С фильтром же результат будет
{
"application_data": {
"nginx": {
"key": "value"
}
},
"message": '{ \"key\": \"value\"} '
}
"application_data" --> "nginx" описаны в target => "[application_data][nginx]"
Исходный message никуда не девается и удаляить его если не нужен нужно отдельно
Удаление лишнего поля
Разобраное на предыдущем шаге сообшение больше не нужно.
Важно: тут не рассматривается случай еслли вместо JSON в логе прийдет что-то другое, в таком случае это сообшение будет утрачено. Но так-как формат сообщения контролируется со стороны конфигурации nginx то это не проблема.
mutate {
remove_field => [ "message" ]
}
Разбор поля [application_data][nginx][nginx_request_body]
json {
source => "[application_data][nginx][nginx_request_body]"
target => "[application_data][nginx][nginx_request_body_json]"
tag_on_failure => "request_body_is_not_json"
}
Тут в случае успеха (т.е. если тело запроса это JSON) - разобранный результат сохранить в поле nginx_request_body_json
Исходное поле не удаляется (на тот случай если в запросе был Не JSON), при ошибке разбора добавляется тег request_body_is_not_json
Разбор поля [application_data][nginx][nginx_response_body_json]
Аналогично - но разбор ответа котрый мог быть JSON или нет
json {
source => "[application_data][nginx][nginx_response_body]"
target => "[application_data][nginx][nginx_response_body_json]"
tag_on_failure => "response_body_is_not_json"
}
Разбор поля с заголовками
Поля с заголовками пишуться LUA-кодом:
rowtext = string.format(" %s='%s' ", k, v)
Это сделано специально для простоты разбора. До разбора заголовки выглядят так:
"nginx_request_headers": " host='domain.tld' accept-language='en-gb' accept-encoding='gzip, deflate, br' connection='keep-alive' content-type='application/json' user-agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15' authorization='Basic ...=' kbn-system-request='true' ...
Фильтр разбирает такие строки используя разделитель"=" по-умолчанию
kv {
source => "[application_data][nginx][nginx_request_headers]"
target => "[application_data][nginx][nginx_request_headers]"
}
Этот фильтр берет строку из поля nginx_request_headers и складывает результат работра туде же:
"nginx_request_headers": {
"content-type": "application/x-www-form-urlencoded",
"content-length": "7",
"host": "...",
"user-agent": "curl/7.68.0",
"accept": "*/*",
"authorization": "Basic ...="
TODO: тут можно подумать как разбирать тело запроса в зависимости от content-type
Разбор поля с датой
Для того что бы использовать время генерации лога, а не время поступления на логстеш.
Настройка максимально проста - поле из которого брать и как "пробовать разобрать".
date {
match => [
"[application_data][nginx][nginx_time_iso8601]",
"MMM dd yyyy HH:mm:ss",
"MMM d yyyy HH:mm:ss",
"ISO8601"
]
target => "@application_timestamp"
}
Filter для nginx error logs
Конфигурация
if ([type] == "nginx_error_log") {
grok {
match => [ "message" , "(?<nginx_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}) \[%{LOGLEVEL:log_level}\] %{GREEDYDATA:[application_data][nginx][message]}" ]
}
mutate {
remove_field => [ "message" ]
}
date {
match => [
"[nginx_time]",
"yyyy/dd/mm HH:mm:ss"
]
target => "@application_timestamp"
}
mutate {
remove_field => [ "nginx_time" ]
}
}
Построчный разбор
Фильтр что бы применять
if ([type] == "nginx_error_log") {
Разбор с помощью grok
grok {
match => [ "message" , "(?<nginx_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}) \[%{LOGLEVEL:log_level}\] %{GREEDYDATA:[application_data][nginx][message]}" ]
}
Входные данные выглядят так:
2021/07/06 08:16:18 [notice] 1#1: start worker process 22
Другими словами эьл
- Дата и время
- Уровень сообщения
- Текст сообщения
Конструкция
(?<nginx_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME})
означает что в результирующее поле nginx_time> записать то что попадет под выражение %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}
Далее - стандартная конструкция grok вида %{имя готового выражения:имя поля куда записать}
- LOGLEVEL --> log_level
- GREEDYDATA (остаток сообщения) --> [application_data][nginx][message] (вложенное поле)
Дебаг GROK: https://grokdebug.herokuapp.com
Удаление оригинальных полей
mutate {
remove_field => [ "message" ]
}
Разбор даты
date {
match => [
"[nginx_time]",
"yyyy/dd/mm HH:mm:ss"
]
target => "@application_timestamp"
}
Удаление ненужного поля
mutate {
remove_field => [ "nginx_time" ]
}
Filter для ruby application logs
Конфигурация
if ([ruby_application_logs] == "true") {
mutate {
id => "[Meaningful label for your project, not repeated in any other config files] remove ANSI color codes"
gsub => ["message", "\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[m|K]", ""]
}
grok {
match => [ "message" , "%{WORD:[application_data][ruby][log_level_short]}, \[%{TIMESTAMP_ISO8601:[application_data][ruby][timestamp]} \#%{NUMBER}\] %{WORD:log_level} -- %{GREEDYDATA:[application_data][ruby][log_message]}" ]
}
mutate {
remove_field => [ "message" ]
}
date {
match => [
"[application_data][ruby][timestamp]",
"MMM dd yyyy HH:mm:ss",
"MMM d yyyy HH:mm:ss",
"ISO8601"
]
target => "@application_timestamp"
}
}
mutate {
uppercase => [ "log_level" ]
}
}
Построчный разбор
Фильтрация
if ([ruby_application_logs] == "true") {
Удаление цветных кодов
mutate {
id => "[Meaningful label for your project, not repeated in any other config files] remove ANSI color codes"
gsub => ["message", "\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[m|K]", ""]
}
Разбор с помощью grok
grok {
match => [ "message" , "%{WORD:[application_data][ruby][log_level_short]}, \[%{TIMESTAMP_ISO8601:[application_data][ruby][timestamp]} \#%{NUMBER}\] %{WORD:log_level} -- %{GREEDYDATA:[application_data][ruby][log_message]}" ]
}
Поле message разбирается на поля
- [application_data][ruby][log_level_short] - короткое однобуквенное обозначение уровня лога (D - Debug, E - Error и т.д.)
- [application_data][ruby][timestamp] - время
- log_level - уровень лога, словом
- [application_data][ruby][log_message] - само сообщение до колнца строки
1
mutate {
remove_field => [ "message" ]
}
1
date {
match => [
"[application_data][ruby][timestamp]",
"MMM dd yyyy HH:mm:ss",
"MMM d yyyy HH:mm:ss",
"ISO8601"
]
target => "@application_timestamp"
}
Преобразование к заглавным буквам
нужно так как приложения могут писать уровень по-разному - 'Error', 'error', 'ERROR'
mutate {
uppercase => [ "log_level" ]
}
TODOЗамена 'Err' -> Error и т.п. если найдется
Output
Общая для всех логов часть
Общий получатель логов - Elasticsearch
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "application-logs-%{[host][name]}-%{+YYYY.MM.dd}"
document_type => "nginx_logs"
}
...
пропущены отладочные получатели
...
}
Отладка
Для того что б удобно смотреть на результат работы фильтров и преобразований самый простой способ - писать логи в файл.
Для отладки для nginx access logs
# if ([type] == "nginx_access_log_json") {
# file {
# flush_interval => 5
# gzip => false
# path => "/var/log/logstash/nginx-%{[host][name]}-%{+YYYY.MM.dd}.log"
# }
# }
Для отладки для nginx error logs
# if ([type] == "nginx_error_log") {
# file {
# flush_interval => 5
# gzip => false
# path => "/var/log/logstash/nginx-errors-%{[host][name]}-%{+YYYY.MM.dd}.log"
# }
# }
Для отладки для ruby application logs
#
# if ([ruby_application_logs] == "true") {
# file {
# flush_interval => 5
# gzip => false
# path => "/var/log/logstash/ruby-%{[host][name]}-%{+YYYY.MM.dd}.log"
# }
# }
Elasticsearch-Curator
Ссылки
* https://pawelurbanek.com/elk-nginx-logs-setup