Logstash cho bộ lọc dựa trên dữ liệu log request và response
Dưới đây là cấu hình Logstash cho bộ lọc dựa trên dữ liệu log request và response mà bạn đã cung cấp. Cấu hình này sẽ sử dụng plugin grok để phân tích cú pháp các log và plugin mutate để xử lý các trường cần thiết.
Cấu Hình Logstash Cho request và response
input {
kafka {
bootstrap_servers => "10.100.30.32:9092"
topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
group_id => "log_consumer_group"
auto_offset_reset => "earliest"
}
}
filter {
# Sử dụng mutate để lọc đi ký tự không mong muốn
mutate {
gsub => ["message", "\u001b|\n", ""]
}
# Phân tích log response
if "RESPONSE" in [message] {
grok {
match => {
"message" => """
{"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
"\"nanoOfSecond\":%{NUMBER:nano_sec}," +
"\"thread\":\"%{DATA:thread}\"," +
"\"level\":\"%{WORD:level}\"," +
"\"loggerName\":\"%{DATA:logger_name}\"," +
"\"message\":\"%{GREEDYDATA:log_message}\"," +
"\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
"\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
"\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
"\"spanId\":\"%{DATA:span_id}\"," +
"\"className\":\"%{DATA:class_name}\"," +
"\"clientIp\":\"%{IP:client_ip}\"," +
"\"clientMessageId\":\"%{DATA:client_message_id}\"," +
"\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
"\"duration\":%{NUMBER:duration}," +
"\"methodName\":\"%{DATA:method_name}\"," +
"\"path\":\"%{DATA:path}\"," +
"\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
"\"threadId\":%{NUMBER:thread_id}," +
"\"threadPriority\":%{NUMBER:thread_priority}," +
"\"logType\":\"%{WORD:log_type}\"," +
"\"application\":\"%{DATA:application}\"," +
"\"localIp\":\"%{IP:local_ip}\"}
"""
}
}
}
# Phân tích log request
if "REQUEST" in [message] {
grok {
match => {
"message" => """
{"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
"\"nanoOfSecond\":%{NUMBER:nano_sec}," +
"\"thread\":\"%{DATA:thread}\"," +
"\"level\":\"%{WORD:level}\"," +
"\"loggerName\":\"%{DATA:logger_name}\"," +
"\"message\":\"%{GREEDYDATA:log_message}\"," +
"\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
"\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
"\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
"\"spanId\":\"%{DATA:span_id}\"," +
"\"className\":\"%{DATA:class_name}\"," +
"\"clientIp\":\"%{IP:client_ip}\"," +
"\"clientMessageId\":\"%{DATA:client_message_id}\"," +
"\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
"\"duration\":%{NUMBER:duration}," +
"\"methodName\":\"%{DATA:method_name}\"," +
"\"path\":\"%{DATA:path}\"," +
"\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
"\"threadId\":%{NUMBER:thread_id}," +
"\"threadPriority\":%{NUMBER:thread_priority}," +
"\"logType\":\"%{WORD:log_type}\"," +
"\"application\":\"%{DATA:application}\"," +
"\"localIp\":\"%{IP:local_ip}\"}
"""
}
}
}
# Tạo các trường mới
mutate {
add_field => {
"ts" => "%{epoch_sec}"
"ip" => "%{client_ip}"
"msgId" => "%{client_message_id}"
"duration" => "%{duration}"
"method" => "%{method_name}"
"path" => "%{path}"
"app" => "%{application}"
"localIp" => "%{local_ip}"
"logType" => "%{log_type}"
"type" => "%{+log_type}" # Đánh dấu loại log (REQUEST/RESPONSE)
}
remove_field => [
"instant", "thread", "level", "logger_name", "message",
"endOfBatch", "logger_fqcn", "contextMap",
"threadId", "threadPriority"
]
}
# Chuyển đổi timestamp
date {
match => [ "client_time", "ISO8601" ]
timezone => "+07:00"
target => "@timestamp"
}
ruby {
code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
}
}
output {
elasticsearch {
hosts => ["http://10.152.183.57:9200"]
template => "/usr/share/logstash/templates/logstash_template.json"
template_name => "logstash"
template_overwrite => true
index => "logstash-%{indexDay}"
document_type => "_doc"
}
stdout {
codec => rubydebug
}
}
All rights reserved
Bình luận
"dữ liệu log request và response mà bạn đã cung cấp", cái này b để ở đâu thế, cho mình xem log example
Cái này mình dựa vào template trong log4j2 nhé sóp, tuỳ vào log mà mình cấu hình, kia là mình ví dụ ạ