LogstashExample1: различия между версиями

Материал из noname.com.ua
Перейти к навигацииПерейти к поиску
Строка 587: Строка 587:
 
if ([type] == "nginx_error_log") {
 
if ([type] == "nginx_error_log") {
 
</PRE>
 
</PRE>
====Разбор с помощью grok===
+
====Разбор с помощью grok====
 
<PRE>
 
<PRE>
 
grok {
 
grok {
Строка 626: Строка 626:
 
}
 
}
 
</PRE>
 
</PRE>
  +
 
=== Удаление ненужного поля===
 
=== Удаление ненужного поля===
 
<PRE>
 
<PRE>

Версия 18:31, 10 августа 2021

Пример

Это продолжение статьи https://noname.com.ua/mediawiki/index.php?title=Logstash

Задача

Задача - отправка логов от тестового приложения в Logstash

  • Для тестовых устновок - писать максимально подробные логи приложения
    • Nginx - настолько подробно насколько это возможно
    • Логи которые пишет бекенд (Ruby)
  • Для Staging/Prod - менее подробные логи (что бы не перегрузить Elasticsearch)

Реализация

  • Сервер для сбора - Elasticsearch, Logstash, Kibana, Curator
  • На окружениях которые генерируют логи - Filebeat (Filebeat проще и легковеснее - нет нужды ставить везде Logstash)


При реализации НЕ учитывалось (из экономических соображений):

  • Отказоустойчивость
    • Standalone ElasticSerach (1 нода)
    • Один экзкмпляр Logstash
  • Балансировка нагрузки отсутвует
  • Нет менеджера очередей для логов для сглаживания пиков нагрузки (можно использовать Kafka/RabbitMQ)

nginx

Nginx умеет писать логи в Json (для максимально подробного логгирования требуется поддержка Lua)

Конфигурация (только значимые части)

   # Log in JSON Format
    log_format nginxlog_json escape=json
    '{ '
    '"nginx_http_user_agent": "$http_user_agent",'
    '"nginx_ancient_browser": "$ancient_browser",'
    '"nginx_body_bytes_sent": "$body_bytes_sent",'
    '"nginx_bytes_sent": "$bytes_sent",'
    '"nginx_connection": "$connection",'
    '"nginx_connection_requests": "$connection_requests",'
    '"nginx_connections_active": "$connections_active",'
    '"nginx_connections_reading": "$connections_reading",'
    '"nginx_connections_waiting": "$connections_waiting",'
    '"nginx_connections_writing": "$connections_writing",'
    '"nginx_content_length": "$content_length",'
    '"nginx_content_type": "$content_type",'
    '"nginx_cookie_": "$cookie_",'
    '"nginx_document_root": "$document_root",'
    '"nginx_document_uri": "$document_uri",'
    '"nginx_fastcgi_path_info": "$fastcgi_path_info",'
    '"nginx_fastcgi_script_name": "$fastcgi_script_name",'
    '"nginx_host": "$host",'
    '"nginx_hostname": "$hostname",'
    '"nginx_https": "$https",'
    '"nginx_invalid_referer": "$invalid_referer",'
    '"nginx_is_args": "$is_args",'
    '"nginx_limit_conn_status": "$limit_conn_status",'
    '"nginx_limit_rate": "$limit_rate",'
    '"nginx_limit_req_status": "$limit_req_status",'
    '"nginx_modern_browser": "$modern_browser",'
    '"nginx_msec": "$msec",'
    '"nginx_msie": "$msie",'
    '"nginx_nginx_version": "$nginx_version",'
    '"nginx_proxy_add_x_forwarded_for": "$proxy_add_x_forwarded_for",'
    '"nginx_proxy_host": "$proxy_host",'
    '"nginx_proxy_port": "$proxy_port",'
    '"nginx_proxy_protocol_addr": "$proxy_protocol_addr",'
    '"nginx_proxy_protocol_port": "$proxy_protocol_port",'
    '"nginx_proxy_protocol_server_addr": "$proxy_protocol_server_addr",'
    '"nginx_proxy_protocol_server_port": "$proxy_protocol_server_port",'
    '"nginx_query_string": "$query_string",'
    '"nginx_realip_remote_addr": "$realip_remote_addr",'
    '"nginx_realip_remote_port": "$realip_remote_port",'
    '"nginx_remote_addr": "$remote_addr",'
    '"nginx_remote_port": "$remote_port",'
    '"nginx_remote_user": "$remote_user",'
    '"nginx_request": "$request",'
    '"nginx_request_headers": "$request_headers",'
    '"nginx_request_body": "$request_body",'
    '"nginx_request_id": "$request_id",'
    '"nginx_request_length": "$request_length",'
    '"nginx_request_method": "$request_method",'
    '"nginx_request_time": "$request_time",'
    '"nginx_request_uri": "$request_uri",'
    '"nginx_scheme": "$scheme",'
    '"nginx_server_addr": "$server_addr",'
    '"nginx_server_name": "$server_name",'
    '"nginx_server_port": "$server_port",'
    '"nginx_server_port": "$server_port",'
    '"nginx_server_protocol": "$server_protocol",'
    '"nginx_ssl_cipher": "$ssl_cipher",'
    '"nginx_ssl_ciphers": "$ssl_ciphers",'
    '"nginx_ssl_client_cert": "$ssl_client_cert",'
    '"nginx_ssl_client_escaped_cert": "$ssl_client_escaped_cert",'
    '"nginx_ssl_client_fingerprint": "$ssl_client_fingerprint",'
    '"nginx_ssl_client_i_dn": "$ssl_client_i_dn",'
    '"nginx_ssl_client_raw_cert": "$ssl_client_raw_cert",'
    '"nginx_ssl_client_s_dn": "$ssl_client_s_dn",'
    '"nginx_ssl_client_serial": "$ssl_client_serial",'
    '"nginx_ssl_client_v_end": "$ssl_client_v_end",'
    '"nginx_ssl_client_v_remain": "$ssl_client_v_remain",'
    '"nginx_ssl_client_v_start": "$ssl_client_v_start",'
    '"nginx_ssl_client_verify": "$ssl_client_verify",'
    '"nginx_ssl_early_data": "$ssl_early_data",'
    '"nginx_ssl_protocol": "$ssl_protocol",'
    '"nginx_ssl_server_name": "$ssl_server_name",'
    '"nginx_ssl_session_id": "$ssl_session_id",'
    '"nginx_ssl_session_reused": "$ssl_session_reused",'
    '"nginx_status": "$status",'
    '"nginx_tcpinfo_rtt": "$tcpinfo_rtt",'
    '"nginx_tcpinfo_rttvar": "$tcpinfo_rttvar",'
    '"nginx_tcpinfo_snd_cwnd": "$tcpinfo_snd_cwnd",'
    '"nginx_tcpinfo_rcv_space": "$tcpinfo_rcv_space",'
    '"nginx_time_iso8601": "$time_iso8601",'
    '"nginx_time_local": "$time_local",'
    '"nginx_uid_got": "$uid_got",'
    '"nginx_uid_reset": "$uid_reset",'
    '"nginx_uid_set": "$uid_set",'
    '"nginx_upstream_addr": "$upstream_addr",'
    '"nginx_upstream_bytes_received": "$upstream_bytes_received",'
    '"nginx_upstream_bytes_sent": "$upstream_bytes_sent",'
    '"nginx_upstream_bytes_sent": "$upstream_bytes_sent",'
    '"nginx_upstream_cache_status": "$upstream_cache_status",'
    '"nginx_upstream_connect_time": "$upstream_connect_time",'
    '"nginx_upstream_cookie_": "$upstream_cookie_",'
    '"nginx_upstream_header_time": "$upstream_header_time",'
    '"nginx_upstream_http_": "$upstream_http_",'
    '"nginx_upstream_response_length": "$upstream_response_length",'
    '"nginx_upstream_response_time": "$upstream_response_time",'
    '"nginx_upstream_status": "$upstream_status",'
    '"nginx_uri": "$uri",'
    '"nginx_response_body": "$response_body"'
    '}';
    server {
        listen 443 ssl;
        root /var/www/backend;
        server_name elk.domain.tld;
        access_log /var/log/nginx/elk.domain.tld-access.log.ssl nginxlog_json;
        error_log /var/log/nginx/elk.domain.tld-error.log.ssl;
        client_max_body_size 500M;
        keepalive_timeout 0;
        ssl_certificate     ...;
        ssl_certificate_key ...;

        lua_need_request_body on;

        set $response_body "";
        body_filter_by_lua '
            local response_body = string.sub(ngx.arg[1], 1, 1000)
            ngx.ctx.buffered = (ngx.ctx.buffered or "") .. response_body
            if ngx.arg[2] then
                ngx.var.response_body = ngx.ctx.buffered
            end
        ';

        set_by_lua_block $request_headers{
            local h = ngx.req.get_headers()
            local request_headers_all = ""
            for k, v in pairs(h) do
                local rowtext = ""
                rowtext = string.format(" %s='%s' ", k, v)
                request_headers_all = request_headers_all .. rowtext

            end
            return request_headers_all
        }
...

Результат

tail -1 access.log | jq .

Пример лога (не все поля)

{
  ...
  "nginx_http_user_agent": "python-requests/2.23.0",
  "nginx_ancient_browser": "1",
  "nginx_body_bytes_sent": "175",
  "nginx_document_root": "/usr/local/openresty/nginx/html",
  "nginx_document_uri": "/favicon.ico",
  "nginx_request_headers": " connection='keep-alive'  accept='*/*'  accept-encoding='gzip, deflate'  host='login-anastasiia-env.arturhaunt.com'  user-agent='python-requests/2.23.0' ",
  "nginx_time_iso8601": "2021-08-10T03:09:57+00:00",
  "nginx_time_local": "10/Aug/2021:03:09:57 +0000",
  "nginx_response_body": "<html>\r\n<head><title>301 Moved Permanently</title></head>\r\n<body>\r\n<center><h1>301 Moved Permanently</h1></center>\r\n<hr><center>openresty/1.19.3.2</center>\r\n</body>\r\n</html>\r\n",
  ...
}

Filebeat

для nginx access logs

Предельно простая конфигурация - читать файлы по маске

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/www/deploy/backend/current/log/nginx/*access.log*
  exclude_files: ['\.gz$']
  fields:
    nginx_logs: "true"
    ruby_application_logs: "false"
    type: "nginx_access_log_json"
    source_hostname: "test-env.domain.tld"
  fields_under_root: true

для nginx error logs

Ничем не отличается от access.log

- type: log
  enabled: true
  paths:
    - /var/www/deploy/backend/current/log/nginx/*error.log*
  exclude_files: ['\.gz$']
  fields:
    nginx_logs: "true"
    ruby_application_logs: "false"
    type: "nginx_error_log"
    source_hostname: "some-domain.tld"
  fields_under_root: true

для Ruby Application logs

Немного более сложная сложная конфигурация для многострочного лога

  paths:
    - /var/www/deploy/backend/shared/log/staging.log
  exclude_files: ['\.gz$']
  fields:
    nginx_logs: "false"
    ruby_application_logs: "true"
    type: "ruby_application_logs"
    source_hostname: "some-domain.tld"
  fields_under_root: true
  multiline:
    pattern: '[A-Z], \[[0-9]{4}'
    negate: true
    match: after

Пример лога

D, [2021-08-10T11:17:02.270613 #7] DEBUG -- [ContentSelector::FindExternalService]: /usr/src/app/app/services/content_selector/find_external_service.rb:31:in `perform'
^M/usr/src/app/app/services/service.rb:188:in `call!'
^M/usr/src/app/app/services/service.rb:204:in `call'
^M/usr/src/app/app/services/service.rb:110:in `perform'
^M/usr/src/app/lib/activity_organizers/external.rb:32:in `block in vacant_content_and_topic'
^M/usr/src/app/lib/activity_organizers/external.rb:31:in `each'
^M/usr/src/app/lib/activity_organizers/external.rb:31:in `inject'
^M/usr/src/app/lib/activity_organizers/external.rb:31:in `vacant_content_and_topic'
^M/usr/src/app/lib/activity_organizers/external.rb:18:in `perform'

Разделитель(читается : одна большая буква, запятая, пробел, символ "[", 4 цифры):

pattern: '[A-Z], \[[0-9]{4}'

TODO: Скорее всего список A-Z избыточен так как первая буква означает уровень логгирования, для года наверно можно использовать 202[1-9]

Подробнее про разбор многострочных сообщений - https://noname.com.ua/mediawiki/index.php?title=FilebeatMultiline

Общая часть конфигурации Filebeat

Модули (по-умолчанию)

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:

Отправлять логи в Logstash (на beat input с авторизацией по сертификатам)
Подробнее про настройку: https://noname.com.ua/mediawiki/index.php/Logstash

output.logstash:
  hosts: ["elk.arturhaunt.ninja:5400"]
  ssl.certificate_authorities: ["/etc/elk-certs/elk-ssl.crt"]
  ssl.certificate: "/etc/elk-certs/elk-ssl.crt"
  ssl.key: "/etc/elk-certs/elk-ssl.key"

Процессинг по-умолчанию добавляет метаданные хоста-источника

processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
logging.level: info
logging.selectors: ["*"]

Logstash для nginx access logs

Только access.log
Правила для error.log в другой секции

Input

Получение логов на порту 5400 (Это не syslog протокол - отправляет Filebeat, хотя получене логов через сислог тоже возможно)

input {
    beats {
        port => 5400
        ssl => true
        ssl_certificate_authorities => ["/etc/elk-certs/elk-ssl.crt"]
        ssl_certificate => "/etc/elk-certs/elk-ssl.crt"
        ssl_key => "/etc/elk-certs/elk-ssl.key"
        ssl_verify_mode => "force_peer"
    }
}

В настройке собственно ничего нет кроме путей к сертификатам и номера порта.

Filter

Часть фильтра не относящаяся к nginx access log описана в другом разделе

Конфигурация

filter {
    mutate {
        add_field => { "nginx" => "nginx_object" }
        add_field => { "ruby" => "ruby_object" }
    }
    mutate {
        rename => { "nginx" => "[application_data][nginx][object]" }
        rename => { "ruby"  => "[application_data][ruby][object]" }
    }

    if ([type] == "nginx_access_log_json") {
        mutate {
            add_field => { "log_level" => "INFO" }
        }

        json {
            source => "message"
            target => "[application_data][nginx]"
        }

        mutate {
            remove_field => [ "message" ]
        }

        json {
            source => "[application_data][nginx][nginx_request_body]"
            target => "[application_data][nginx][nginx_request_body_json]"
            tag_on_failure => "request_body_is_not_json"
        }

        json {
            source => "[application_data][nginx][nginx_response_body]"
            target => "[application_data][nginx][nginx_response_body_json]"
            tag_on_failure => "response_body_is_not_json"
        }

        kv {
            source => "[application_data][nginx][nginx_request_headers]"
            target => "[application_data][nginx][nginx_request_headers]"
        }
        date {
            match => [
                "[application_data][nginx][nginx_time_iso8601]",
                "MMM dd yyyy HH:mm:ss",
                "MMM  d yyyy HH:mm:ss",
                "ISO8601"
            ]
            target => "@application_timestamp"
        }
    }
...

"Построчный" разбор

Общие для всех логов шаги (не только access.log

Добавление полей

Добавить в лог 2 новых поля
Этот шаг нужен для того что бы потом добавлять в эти поля соответствующие логи

    mutate {
        add_field => { "nginx" => "nginx_object" }
        add_field => { "ruby" => "ruby_object" }
    }
Переименование полей

Переименовать поля. По какой-то причине создать сразу поля с вложениями не получается

    mutate {
        rename => { "nginx" => "[application_data][nginx][object]" }
        rename => { "ruby"  => "[application_data][ruby][object]" }
    }

В результате получим

{
  "input": {
    "type": "log"
  "application_data": {
    "ruby": {
      "object": "ruby_object"
    },
    "nginx": {
...

Глобально это сделано для того что бы логи от бекенда и от nginx отличались на уровне application_data а поля верхнего уровня совпадали (но я до сих пор не уверен имеет ли это смысл или может быть лучше писать их в разные индексы)

Преобразования специфичные для access.log


Условие надожения фильтров - далее специфичные для access logs фильтры.

    if ([type] == "nginx_access_log_json") {
Добавление поля с уровнем сообщения

Модификация лога - для всех сообщений из access.log добавляем уровень INFO
TODO(?) Выставлять другой уровень в зависмости от кода ответа?

        mutate {
            add_field => { "log_level" => "INFO" }
        }
Разбор сообщения (т.е. поля "message")

Этот фильтр разбирает строку как JSON.

        json {
            source => "message"
            target => "[application_data][nginx]"
        }

например если Filebeat пришлет строку лога (экранированный JSON)

'{ \"key\": \"value\"} '

то без модификации в логе будет

{
"message": '{ \"key\": \"value\"} '
}

т.е. в поле message сообщение попадет "как есть"
С фильтром же результат будет

{
"application_data": {
  "nginx": {
     "key": "value"
  }
},
"message": '{ \"key\": \"value\"} '
}

"application_data" --> "nginx" описаны в target => "[application_data][nginx]"
Исходный message никуда не девается и удаляить его если не нужен нужно отдельно

Удаление лишнего поля

Разобраное на предыдущем шаге сообшение больше не нужно.
Важно: тут не рассматривается случай еслли вместо JSON в логе прийдет что-то другое, в таком случае это сообшение будет утрачено. Но так-как формат сообщения контролируется со стороны конфигурации nginx то это не проблема.

        mutate {
            remove_field => [ "message" ]
        }
Разбор поля [application_data][nginx][nginx_request_body]
        json {
            source => "[application_data][nginx][nginx_request_body]"
            target => "[application_data][nginx][nginx_request_body_json]"
            tag_on_failure => "request_body_is_not_json"
        }

Тут в случае успеха (т.е. если тело запроса это JSON) - разобранный результат сохранить в поле nginx_request_body_json
Исходное поле не удаляется (на тот случай если в запросе был Не JSON), при ошибке разбора добавляется тег request_body_is_not_json

Разбор поля [application_data][nginx][nginx_response_body_json]

Аналогично - но разбор ответа котрый мог быть JSON или нет

        json {
            source => "[application_data][nginx][nginx_response_body]"
            target => "[application_data][nginx][nginx_response_body_json]"
            tag_on_failure => "response_body_is_not_json"
        }
Разбор поля с заголовками

Поля с заголовками пишуться LUA-кодом:

 rowtext = string.format(" %s='%s' ", k, v)

Это сделано специально для простоты разбора. До разбора заголовки выглядят так:

"nginx_request_headers": " host='domain.tld'  accept-language='en-gb'  accept-encoding='gzip, deflate, br'  connection='keep-alive'  content-type='application/json'  user-agent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15'  authorization='Basic ...='  kbn-system-request='true' ...

Фильтр разбирает такие строки используя разделитель"=" по-умолчанию

        kv {
            source => "[application_data][nginx][nginx_request_headers]"
            target => "[application_data][nginx][nginx_request_headers]"
        }

Этот фильтр берет строку из поля nginx_request_headers и складывает результат работра туде же:

      "nginx_request_headers": {
        "content-type": "application/x-www-form-urlencoded",
        "content-length": "7",
        "host": "...",
        "user-agent": "curl/7.68.0",
        "accept": "*/*",
        "authorization": "Basic ...="

TODO: тут можно подумать как разбирать тело запроса в зависимости от content-type

Разбор поля с датой

Для того что бы использовать время генерации лога, а не время поступления на логстеш.
Настройка максимально проста - поле из которого брать и как "пробовать разобрать".

        date {
            match => [
                "[application_data][nginx][nginx_time_iso8601]",
                "MMM dd yyyy HH:mm:ss",
                "MMM  d yyyy HH:mm:ss",
                "ISO8601"
            ]
            target => "@application_timestamp"
        }

Output

Минимально-возможный конфиг - отправлять данные в Elasticsearch и кроме этого в файл (нужно для отладки) output {

   elasticsearch {
       hosts => ["localhost:9200"]
       index => "application-logs-%{[host][name]}-%{+YYYY.MM.dd}"
       document_type => "nginx_logs"
   }
   if ([type] == "nginx_access_log_json") {
       file {
           flush_interval => 5
               gzip => false
               path => "/var/log/logstash/nginx-%{[host][name]}-%{+YYYY.MM.dd}.log"
       }
   }

...

}

Logstash для nginx error logs

Input

Один input для всех видов логов - тот же Filebeat (port 5400)

Filter

Конфигурация

    if ([type] == "nginx_error_log") {
        grok {
            match => [ "message" , "(?<nginx_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}) \[%{LOGLEVEL:log_level}\] %{GREEDYDATA:[application_data][nginx][message]}" ]
        }
        mutate {
            remove_field => [ "message" ]
        }
        date {
            match => [
                "[nginx_time]",
                "yyyy/dd/mm HH:mm:ss"
            ]
            target => "@application_timestamp"
        }
        mutate {
            remove_field => [ "nginx_time" ]
        }

    }

Построчный разбор

Фильтр что бы применять

    if ([type] == "nginx_error_log") {

Разбор с помощью grok

        grok {
            match => [ "message" , "(?<nginx_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}) \[%{LOGLEVEL:log_level}\] %{GREEDYDATA:[application_data][nginx][message]}" ]
        }

Входные данные выглядят так:

2021/07/06 08:16:18 [notice] 1#1: start worker process 22

Другими словами эьл

  • Дата и время
  • Уровень сообщения
  • Текст сообщения

Конструкция

(?<nginx_time>%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}) 

означает что в результирующее поле nginx_time> записать то что попадет под выражение %{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME}
Далее - стандартная конструкция grok вида %{имя готового выражения:имя поля куда записать}

  • LOGLEVEL --> log_level
  • GREEDYDATA (остаток сообщения) --> [application_data][nginx][message] (вложенное поле)

Удаление оригинальных полей

        mutate {
            remove_field => [ "message" ]
        }

Разбор даты

        date {
            match => [
                "[nginx_time]",
                "yyyy/dd/mm HH:mm:ss"
            ]
            target => "@application_timestamp"
        }

Удаление ненужного поля

        mutate {
            remove_field => [ "nginx_time" ]
        }

Output

Output

Общий получатель логов - Elasticsearch

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "application-logs-%{[host][name]}-%{+YYYY.MM.dd}"
        document_type => "nginx_logs"
    }

Для дебага

#    if ([type] == "nginx_error_log") {
#        file {
#            flush_interval => 5
#                gzip => false
#                path => "/var/log/logstash/nginx-errors-%{[host][name]}-%{+YYYY.MM.dd}.log"
#        }

Logstash для Ruby Application logs

Input

Один input для всех видов логов - тот же Filebeat (port 5400)

Filter

Конфигурация

    if ([ruby_application_logs] == "true") {
        mutate {
            id => "[Meaningful label for your project, not repeated in any other config files] remove ANSI color codes"
            gsub => ["message", "\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[m|K]", ""]
        }

        grok {
            match => [ "message" , "%{WORD:[application_data][ruby][log_level_short]}, \[%{TIMESTAMP_ISO8601:[application_data][ruby][timestamp]} \#%{NUMBER}\] %{WORD:log_level} -- %{GREEDYDATA:[application_data][ruby][log_message]}" ]
        }

        mutate {
            remove_field => [ "message" ]
        }

        date {
            match => [
                "[application_data][ruby][timestamp]",
                "MMM dd yyyy HH:mm:ss",
                "MMM  d yyyy HH:mm:ss",
                "ISO8601"
            ]
            target => "@application_timestamp"
        }

    }

    mutate {
        uppercase => [ "log_level" ]
    }

}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "application-logs-%{[host][name]}-%{+YYYY.MM.dd}"
        document_type => "nginx_logs"
    }
    if ([type] == "nginx_access_log_json") {
        file {
            flush_interval => 5
                gzip => false
                path => "/var/log/logstash/nginx-%{[host][name]}-%{+YYYY.MM.dd}.log"
        }
    }
#
#    if ([ruby_application_logs] == "true") {
#        file {
#            flush_interval => 5
#                gzip => false
#                path => "/var/log/logstash/ruby-%{[host][name]}-%{+YYYY.MM.dd}.log"
#        }
#    }
#
#    if ([type] == "nginx_error_log") {
#        file {
#            flush_interval => 5
#                gzip => false
#                path => "/var/log/logstash/nginx-errors-%{[host][name]}-%{+YYYY.MM.dd}.log"
#        }

Построчный разбор

=Фильтрация

    if ([ruby_application_logs] == "true") {

Удаление цветных кодов

        mutate {
            id => "[Meaningful label for your project, not repeated in any other config files] remove ANSI color codes"
            gsub => ["message", "\x1B\[([0-9]{1,2}(;[0-9]{1,2})?)?[m|K]", ""]
        }

1

        grok {
            match => [ "message" , "%{WORD:[application_data][ruby][log_level_short]}, \[%{TIMESTAMP_ISO8601:[application_data][ruby][timestamp]} \#%{NUMBER}\] %{WORD:log_level} -- %{GREEDYDATA:[application_data][ruby][log_message]}" ]
        }
        mutate {
            remove_field => [ "message" ]
        }
        date {
            match => [
                "[application_data][ruby][timestamp]",
                "MMM dd yyyy HH:mm:ss",
                "MMM  d yyyy HH:mm:ss",
                "ISO8601"
            ]
            target => "@application_timestamp"
        }
    mutate {
        uppercase => [ "log_level" ]
    }

Output

Общий получатель логов - Elasticsearch

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "application-logs-%{[host][name]}-%{+YYYY.MM.dd}"
        document_type => "nginx_logs"
    }

Для дебага

#
#    if ([ruby_application_logs] == "true") {
#        file {
#            flush_interval => 5
#                gzip => false
#                path => "/var/log/logstash/ruby-%{[host][name]}-%{+YYYY.MM.dd}.log"
#        }
#    }