Xử lý log từ Kafka
Dưới đây là một ví dụ cấu hình cho tệp logstash.conf của bạn để xử lý log từ Kafka, sử dụng grok để phân tích cú pháp và thêm một số cấu hình mutate để loại bỏ các trường không cần thiết. Bạn cũng có thể thêm bước gsub để lọc các ký tự không hợp lệ nếu cần:
input {
kafka {
bootstrap_servers => "10.100.30.32:9092"
topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
group_id => "log_consumer_group"
auto_offset_reset => "earliest"
}
}
filter {
# Sử dụng gsub để loại bỏ các ký tự không hợp lệ (nếu cần)
mutate {
gsub => ["message", "\u001b|\n", ""]
}
# Phân tích cú pháp log với grok
grok {
match => {
"message" => """
{"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
"\"nanoOfSecond\":%{NUMBER:nano_sec}," +
"\"thread\":\"%{DATA:thread}\"," +
"\"level\":\"%{WORD:level}\"," +
"\"loggerName\":\"%{DATA:logger_name}\"," +
"\"message\":\"%{GREEDYDATA:log_message}\"," +
"\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
"\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
"\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
"\"spanId\":\"%{DATA:span_id}\"," +
"\"className\":\"%{DATA:class_name}\"," +
"\"clientIp\":\"%{IP:client_ip}\"," +
"\"clientMessageId\":\"%{DATA:client_message_id}\"," +
"\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
"\"duration\":%{NUMBER:duration}," +
"\"methodName\":\"%{DATA:method_name}\"," +
"\"path\":\"%{DATA:path}\"," +
"\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
"\"threadId\":%{NUMBER:thread_id}," +
"\"threadPriority\":%{NUMBER:thread_priority}," +
"\"logType\":\"%{WORD:log_type}\"," +
"\"application\":\"%{DATA:application}\"," +
"\"localIp\":\"%{IP:local_ip}\"}
"""
}
}
# Thêm các trường cần thiết và loại bỏ các trường không cần thiết
mutate {
add_field => {
"ts" => "%{epoch_sec}"
"ip" => "%{client_ip}"
"msgId" => "%{client_message_id}"
"duration" => "%{duration}"
"method" => "%{method_name}"
"path" => "%{path}"
"app" => "%{application}"
"localIp" => "%{local_ip}"
"logType" => "%{log_type}"
}
remove_field => ["instant", "thread", "level", "logger_name", "message", "endOfBatch", "logger_fqcn", "contextMap", "threadId", "threadPriority"]
}
# Chuyển đổi timestamp
date {
match => [ "client_time", "ISO8601" ]
timezone => "+07:00"
target => "@timestamp"
}
# Thêm trường ngày tháng
ruby {
code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
}
}
output {
elasticsearch {
hosts => ["http://10.152.183.57:9200"]
template => "/usr/share/logstash/templates/logstash_template.json"
template_name => "logstash"
template_overwrite => true
index => "logstash-%{indexDay}"
document_type => "_doc"
codec => json
}
stdout {
codec => rubydebug
}
}
All Rights Reserved