0

Xử lý log từ Kafka

Dưới đây là một ví dụ cấu hình cho tệp logstash.conf của bạn để xử lý log từ Kafka, sử dụng grok để phân tích cú pháp và thêm một số cấu hình mutate để loại bỏ các trường không cần thiết. Bạn cũng có thể thêm bước gsub để lọc các ký tự không hợp lệ nếu cần:

input {
  kafka {
    bootstrap_servers => "10.100.30.32:9092"
    topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
    group_id => "log_consumer_group"
    auto_offset_reset => "earliest"
  }
}

filter {  
  # Sử dụng gsub để loại bỏ các ký tự không hợp lệ (nếu cần)
  mutate {
    gsub => ["message", "\u001b|\n", ""]
  }

  # Phân tích cú pháp log với grok
  grok {
    match => {
      "message" => """
          {"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
          "\"nanoOfSecond\":%{NUMBER:nano_sec}," +
          "\"thread\":\"%{DATA:thread}\"," +
          "\"level\":\"%{WORD:level}\"," +
          "\"loggerName\":\"%{DATA:logger_name}\"," +
          "\"message\":\"%{GREEDYDATA:log_message}\"," +
          "\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
          "\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
          "\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
          "\"spanId\":\"%{DATA:span_id}\"," +
          "\"className\":\"%{DATA:class_name}\"," +
          "\"clientIp\":\"%{IP:client_ip}\"," +
          "\"clientMessageId\":\"%{DATA:client_message_id}\"," +
          "\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
          "\"duration\":%{NUMBER:duration}," +
          "\"methodName\":\"%{DATA:method_name}\"," +
          "\"path\":\"%{DATA:path}\"," +
          "\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
          "\"threadId\":%{NUMBER:thread_id}," +
          "\"threadPriority\":%{NUMBER:thread_priority}," +
          "\"logType\":\"%{WORD:log_type}\"," +
          "\"application\":\"%{DATA:application}\"," +
          "\"localIp\":\"%{IP:local_ip}\"}
      """
    }
  }
  
  # Thêm các trường cần thiết và loại bỏ các trường không cần thiết
  mutate {
    add_field => {
      "ts" => "%{epoch_sec}"
      "ip" => "%{client_ip}"
      "msgId" => "%{client_message_id}"
      "duration" => "%{duration}"
      "method" => "%{method_name}"
      "path" => "%{path}"
      "app" => "%{application}"
      "localIp" => "%{local_ip}"
      "logType" => "%{log_type}"
    }
    remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"]
  }

  # Chuyển đổi timestamp
  date {
    match => [ "client_time", "ISO8601" ]
    timezone => "+07:00"
    target => "@timestamp"
  }

  # Thêm trường ngày tháng
  ruby {
    code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
  }
}

output {
  elasticsearch {
    hosts => ["http://10.152.183.57:9200"]
    template => "/usr/share/logstash/templates/logstash_template.json"
    template_name => "logstash"
    template_overwrite => true
    index => "logstash-%{indexDay}"
    document_type => "_doc"
    codec => json
  }
  stdout {
    codec => rubydebug
  }
}


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.