+2

Logstash cho bộ lọc dựa trên dữ liệu log request và response

Dưới đây là cấu hình Logstash cho bộ lọc dựa trên dữ liệu log request và response mà bạn đã cung cấp. Cấu hình này sẽ sử dụng plugin grok để phân tích cú pháp các log và plugin mutate để xử lý các trường cần thiết.

Cấu Hình Logstash Cho request và response

input {
  kafka {
    bootstrap_servers => "10.100.30.32:9092"
    topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
    group_id => "log_consumer_group"
    auto_offset_reset => "earliest"
  }
}

filter {
  # Sử dụng mutate để lọc đi ký tự không mong muốn
  mutate {
    gsub => ["message", "\u001b|\n", ""]
  }

  # Phân tích log response
  if "RESPONSE" in [message] {
    grok {
      match => {
        "message" => """
          {"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
          "\"nanoOfSecond\":%{NUMBER:nano_sec}," +
          "\"thread\":\"%{DATA:thread}\"," +
          "\"level\":\"%{WORD:level}\"," +
          "\"loggerName\":\"%{DATA:logger_name}\"," +
          "\"message\":\"%{GREEDYDATA:log_message}\"," +
          "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
          "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
          "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
          "\"spanId\":\"%{DATA:span_id}\"," +
          "\"className\":\"%{DATA:class_name}\"," +
          "\"clientIp\":\"%{IP:client_ip}\"," +
          "\"clientMessageId\":\"%{DATA:client_message_id}\"," +
          "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
          "\"duration\":%{NUMBER:duration}," +
          "\"methodName\":\"%{DATA:method_name}\"," +
          "\"path\":\"%{DATA:path}\"," +
          "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
          "\"threadId\":%{NUMBER:thread_id}," +
          "\"threadPriority\":%{NUMBER:thread_priority}," +
          "\"logType\":\"%{WORD:log_type}\"," +
          "\"application\":\"%{DATA:application}\"," +
          "\"localIp\":\"%{IP:local_ip}\"}
        """
      }
    }
  }

  # Phân tích log request
  if "REQUEST" in [message] {
    grok {
      match => {
        "message" => """
          {"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
          "\"nanoOfSecond\":%{NUMBER:nano_sec}," +
          "\"thread\":\"%{DATA:thread}\"," +
          "\"level\":\"%{WORD:level}\"," +
          "\"loggerName\":\"%{DATA:logger_name}\"," +
          "\"message\":\"%{GREEDYDATA:log_message}\"," +
          "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
          "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
          "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
          "\"spanId\":\"%{DATA:span_id}\"," +
          "\"className\":\"%{DATA:class_name}\"," +
          "\"clientIp\":\"%{IP:client_ip}\"," +
          "\"clientMessageId\":\"%{DATA:client_message_id}\"," +
          "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
          "\"duration\":%{NUMBER:duration}," +
          "\"methodName\":\"%{DATA:method_name}\"," +
          "\"path\":\"%{DATA:path}\"," +
          "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
          "\"threadId\":%{NUMBER:thread_id}," +
          "\"threadPriority\":%{NUMBER:thread_priority}," +
          "\"logType\":\"%{WORD:log_type}\"," +
          "\"application\":\"%{DATA:application}\"," +
          "\"localIp\":\"%{IP:local_ip}\"}
        """
      }
    }
  }

  # Tạo các trường mới
  mutate {
    add_field => {
      "ts" => "%{epoch_sec}"
      "ip" => "%{client_ip}"
      "msgId" => "%{client_message_id}"
      "duration" => "%{duration}"
      "method" => "%{method_name}"
      "path" => "%{path}"
      "app" => "%{application}"
      "localIp" => "%{local_ip}"
      "logType" => "%{log_type}"
      "type" => "%{+log_type}"  # Đánh dấu loại log (REQUEST/RESPONSE)
    }
    remove_field => [
      "instant", "thread", "level", "logger_name", "message", 
      "endOfBatch", "logger_fqcn", "contextMap", 
      "threadId", "threadPriority"
    ]
  }

  # Chuyển đổi timestamp
  date {
    match => [ "client_time", "ISO8601" ]
    timezone => "+07:00"
    target => "@timestamp"
  }
  
  ruby {
    code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
  }
}

output {
  elasticsearch {
    hosts => ["http://10.152.183.57:9200"]
    template => "/usr/share/logstash/templates/logstash_template.json"
    template_name => "logstash"
    template_overwrite => true
    index => "logstash-%{indexDay}"
    document_type => "_doc"
  }
  stdout {
    codec => rubydebug
  }
}

All rights reserved

Bình luận

Đăng nhập để bình luận
Avatar
@4rever
thg 9 27, 2024 1:14 SA

"dữ liệu log request và response mà bạn đã cung cấp", cái này b để ở đâu thế, cho mình xem log example

Avatar
@duongdtpromax
thg 9 27, 2024 3:06 SA

Cái này mình dựa vào template trong log4j2 nhé sóp, tuỳ vào log mà mình cấu hình, kia là mình ví dụ ạ

Avatar
+2
Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí