Logstash cho bộ lọc dựa trên dữ liệu log request và response
Dưới đây là cấu hình Logstash cho bộ lọc dựa trên dữ liệu log request và response mà bạn đã cung cấp. Cấu hình này sẽ sử dụng plugin grok để phân tích cú pháp các log và plugin mutate để xử lý các trường cần thiết.
Cấu Hình Logstash Cho request và response
input {
kafka {
bootstrap_servers => "10.100.30.32:9092"
topics => ["ESMART-CATEGORY-LOGS", "ESMART-GATEWAY-LOGS"]
group_id => "log_consumer_group"
auto_offset_reset => "earliest"
}
}
filter {
# Sử dụng mutate để lọc đi ký tự không mong muốn
mutate {
gsub => ["message", "\u001b|\n", ""]
}
# Phân tích log response
if "RESPONSE" in [message] {
grok {
match => {
"message" => """
{"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
"\"nanoOfSecond\":%{NUMBER:nano_sec}," +
"\"thread\":\"%{DATA:thread}\"," +
"\"level\":\"%{WORD:level}\"," +
"\"loggerName\":\"%{DATA:logger_name}\"," +
"\"message\":\"%{GREEDYDATA:log_message}\"," +
"\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
"\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
"\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
"\"spanId\":\"%{DATA:span_id}\"," +
"\"className\":\"%{DATA:class_name}\"," +
"\"clientIp\":\"%{IP:client_ip}\"," +
"\"clientMessageId\":\"%{DATA:client_message_id}\"," +
"\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
"\"duration\":%{NUMBER:duration}," +
"\"methodName\":\"%{DATA:method_name}\"," +
"\"path\":\"%{DATA:path}\"," +
"\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
"\"threadId\":%{NUMBER:thread_id}," +
"\"threadPriority\":%{NUMBER:thread_priority}," +
"\"logType\":\"%{WORD:log_type}\"," +
"\"application\":\"%{DATA:application}\"," +
"\"localIp\":\"%{IP:local_ip}\"}
"""
}
}
}
# Phân tích log request
if "REQUEST" in [message] {
grok {
match => {
"message" => """
{"instant":{"epochSecond":%{NUMBER:epoch_sec}," +
"\"nanoOfSecond\":%{NUMBER:nano_sec}," +
"\"thread\":\"%{DATA:thread}\"," +
"\"level\":\"%{WORD:level}\"," +
"\"loggerName\":\"%{DATA:logger_name}\"," +
"\"message\":\"%{GREEDYDATA:log_message}\"," +
"\"endOfBatch\":%{GREEDYDATA:end_of_batch}," +
"\"loggerFqcn\":\"%{DATA:logger_fqcn}\"," +
"\"contextMap\":{\"traceId\":\"%{DATA:trace_id}\"," +
"\"spanId\":\"%{DATA:span_id}\"," +
"\"className\":\"%{DATA:class_name}\"," +
"\"clientIp\":\"%{IP:client_ip}\"," +
"\"clientMessageId\":\"%{DATA:client_message_id}\"," +
"\"clientTime\":\"%{TIMESTAMP_ISO8601:client_time}\"," +
"\"duration\":%{NUMBER:duration}," +
"\"methodName\":\"%{DATA:method_name}\"," +
"\"path\":\"%{DATA:path}\"," +
"\"stackTrace\":\"%{GREEDYDATA:stack_trace}\"}," +
"\"threadId\":%{NUMBER:thread_id}," +
"\"threadPriority\":%{NUMBER:thread_priority}," +
"\"logType\":\"%{WORD:log_type}\"," +
"\"application\":\"%{DATA:application}\"," +
"\"localIp\":\"%{IP:local_ip}\"}
"""
}
}
}
# Tạo các trường mới
mutate {
add_field => {
"ts" => "%{epoch_sec}"
"ip" => "%{client_ip}"
"msgId" => "%{client_message_id}"
"duration" => "%{duration}"
"method" => "%{method_name}"
"path" => "%{path}"
"app" => "%{application}"
"localIp" => "%{local_ip}"
"logType" => "%{log_type}"
"type" => "%{+log_type}" # Đánh dấu loại log (REQUEST/RESPONSE)
}
remove_field => [
"instant", "thread", "level", "logger_name", "message",
"endOfBatch", "logger_fqcn", "contextMap",
"threadId", "threadPriority"
]
}
# Chuyển đổi timestamp
date {
match => [ "client_time", "ISO8601" ]
timezone => "+07:00"
target => "@timestamp"
}
ruby {
code => "event.set('indexDay', event.get('@timestamp').time.localtime('+07:00').strftime('%Y%m%d'))"
}
}
output {
elasticsearch {
hosts => ["http://10.152.183.57:9200"]
template => "/usr/share/logstash/templates/logstash_template.json"
template_name => "logstash"
template_overwrite => true
index => "logstash-%{indexDay}"
document_type => "_doc"
}
stdout {
codec => rubydebug
}
}
All rights reserved