+2

Logstash cho bộ lọc dựa trên dữ liệu log request và response

Dưới đây là cấu hình Logstash cho bộ lọc dựa trên dữ liệu log request và response mà bạn đã cung cấp. Cấu hình này sẽ sử dụng plugin grok để phân tích cú pháp các log và plugin mutate để xử lý các trường cần thiết.

Cấu Hình Logstash Cho request và response

input {
  kafka {
    bootstrap_servers => "10.100.30.32:9092"
    topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
    group_id => "log_consumer_group"
    auto_offset_reset => "earliest"
  }
}

filter {
  # Sử dụng mutate để lọc đi ký tự không mong muốn
  mutate {
    gsub => ["message", "\u001b|\n", ""]
  }

  # Phân tích log response
  if "RESPONSE" in [message] {
    grok {
      match => {
        "message" => """
          {"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
          "\"nanoOfSecond\":%{NUMBER:nano_sec}," +
          "\"thread\":\"%{DATA:thread}\"," +
          "\"level\":\"%{WORD:level}\"," +
          "\"loggerName\":\"%{DATA:logger_name}\"," +
          "\"message\":\"%{GREEDYDATA:log_message}\"," +
          "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
          "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
          "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
          "\"spanId\":\"%{DATA:span_id}\"," +
          "\"className\":\"%{DATA:class_name}\"," +
          "\"clientIp\":\"%{IP:client_ip}\"," +
          "\"clientMessageId\":\"%{DATA:client_message_id}\"," +
          "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
          "\"duration\":%{NUMBER:duration}," +
          "\"methodName\":\"%{DATA:method_name}\"," +
          "\"path\":\"%{DATA:path}\"," +
          "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
          "\"threadId\":%{NUMBER:thread_id}," +
          "\"threadPriority\":%{NUMBER:thread_priority}," +
          "\"logType\":\"%{WORD:log_type}\"," +
          "\"application\":\"%{DATA:application}\"," +
          "\"localIp\":\"%{IP:local_ip}\"}
        """
      }
    }
  }

  # Phân tích log request
  if "REQUEST" in [message] {
    grok {
      match => {
        "message" => """
          {"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
          "\"nanoOfSecond\":%{NUMBER:nano_sec}," +
          "\"thread\":\"%{DATA:thread}\"," +
          "\"level\":\"%{WORD:level}\"," +
          "\"loggerName\":\"%{DATA:logger_name}\"," +
          "\"message\":\"%{GREEDYDATA:log_message}\"," +
          "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
          "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
          "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
          "\"spanId\":\"%{DATA:span_id}\"," +
          "\"className\":\"%{DATA:class_name}\"," +
          "\"clientIp\":\"%{IP:client_ip}\"," +
          "\"clientMessageId\":\"%{DATA:client_message_id}\"," +
          "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
          "\"duration\":%{NUMBER:duration}," +
          "\"methodName\":\"%{DATA:method_name}\"," +
          "\"path\":\"%{DATA:path}\"," +
          "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
          "\"threadId\":%{NUMBER:thread_id}," +
          "\"threadPriority\":%{NUMBER:thread_priority}," +
          "\"logType\":\"%{WORD:log_type}\"," +
          "\"application\":\"%{DATA:application}\"," +
          "\"localIp\":\"%{IP:local_ip}\"}
        """
      }
    }
  }

  # Tạo các trường mới
  mutate {
    add_field => {
      "ts" => "%{epoch_sec}"
      "ip" => "%{client_ip}"
      "msgId" => "%{client_message_id}"
      "duration" => "%{duration}"
      "method" => "%{method_name}"
      "path" => "%{path}"
      "app" => "%{application}"
      "localIp" => "%{local_ip}"
      "logType" => "%{log_type}"
      "type" => "%{+log_type}"  # Đánh dấu loại log (REQUEST/RESPONSE)
    }
    remove_field => [
      "instant", "thread", "level", "logger_name", "message", 
      "endOfBatch", "logger_fqcn", "contextMap", 
      "threadId", "threadPriority"
    ]
  }

  # Chuyển đổi timestamp
  date {
    match => [ "client_time", "ISO8601" ]
    timezone => "+07:00"
    target => "@timestamp"
  }
  
  ruby {
    code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
  }
}

output {
  elasticsearch {
    hosts => ["http://10.152.183.57:9200"]
    template => "/usr/share/logstash/templates/logstash_template.json"
    template_name => "logstash"
    template_overwrite => true
    index => "logstash-%{indexDay}"
    document_type => "_doc"
  }
  stdout {
    codec => rubydebug
  }
}

All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.