ELFK日志集群

予早 2025-10-06 01:02:02
Categories: Tags:

https://www.elastic.co/cn/observability/application-performance-monitoring

https://www.elastic.co/cn/observability/infrastructure-monitoring

https://www.elastic.co/cn/observability/log-monitoring

经典方案是 ELK 和 EFK,Logstash 功能强大但是资源占用高,Filebeat 资源占用低但是功能不如 Logstash,大量日志用 ELK 资源占用有潜在优化空间,复杂日志用 EFK 无法实现完美解析,结合一下 ELFK 是一个综合方案。

搭建 xxx_service

见 Filebeat 部分 xxx_service.py。

搭建 Elasticsearch

安装配置见 Elasticsearch 部分。

搭建 Kibana

安装配置见 Kibana 部分。

搭建 Kafka

安装配置见 DatabaseSystem/Kafka 部分。

./bin/kafka-topics.sh --create --topic topic-xxx_service-log --bootstrap-server localhost:9092

搭建 Filebeat

安装配置见 Filebeat 部分。

filebeat.yml

filebeat:
  inputs:
    - type: log
      enabled: true
      paths:
        - /home/ubuntu/biz.log  # 替换为你的日志文件路径
      fields:
        service: xxx_service # 自定义字段
        env: prod # 自定义字段
        log_topic: topic-xxx_service-log
      multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}\]'  # 匹配以日期开头的行
      multiline.negate: true  # 匹配不包含模式的行
      multiline.match: after  # 将匹配模式后的所有行合并到一条事件中
output:
  kafka:
    # initial brokers for reading cluster metadata
    hosts: [ "localhost:9092" ]

    # message topic selection + partitioning
    topic: '%{[fields.log_topic]}'
    partition.round_robin:
      reachable_only: false

    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000
./filebeat -e -c filebeat.yml
./bin/kafka-console-consumer.sh --topic topic-xxx_service-log --from-beginning --bootstrap-server localhost:9092 --max-messages 1
{
  "@timestamp": "2025-10-06T06:21:35.573Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.19.3"
  },
  "host": {
    "name": "ECS-HN-3121563685"
  },
  "agent": {
    "ephemeral_id": "46127670-32ce-4e8d-a944-8ba1fb02a08c",
    "id": "0e0dc387-f2f5-4d0a-99ba-7979b131f062",
    "name": "ECS-HN-3121563685",
    "type": "filebeat",
    "version": "8.19.3"
  },
  "message": "[2025-10-06 06:21:34,975] [web_logger] [ERROR] [54975ad5-5a4e-4e59-9ddb-2e8d4f8ae898] [log_function_3] [71] [1995498] [124911181021312]: division by zero\nTraceback (most recent call last):\n  File \"/home/ubuntu/xxx_service.py\", line 69, in log_function_3\n    1 / 0\n    ~~^~~\nZeroDivisionError: division by zero",
  "log": {
    "offset": 351282,
    "file": {
      "path": "/home/ubuntu/biz.log"
    },
    "flags": [
      "multiline"
    ]
  },
  "input": {
    "type": "log"
  },
  "fields": {
    "env": "prod",
    "log_topic": "topic-xxx_service-log",
    "service": "xxx_service"
  },
  "ecs": {
    "version": "8.0.0"
  }
}

搭建 Logstash

安装配置见 Logstash 部分。

/usr/local/logstash/config/conf.d/default.conf

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["topic-xxx_service-log"]
    codec => json
    auto_offset_reset => "earliest"
    consumer_threads => 2
    decorate_events => true
  }
}

filter {
  # 解析日志中的消息字段
  grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:log_timestamp}\] \[%{WORD:logger}\] \[%{LOGLEVEL:loglevel}\] \[%{USERNAME:trace_id}\] \[%{USERNAME:function_name}\] \[%{NUMBER:line_number}\] \[%{NUMBER:process_id}\] \[%{NUMBER:thread_id}\]: %{GREEDYDATA:log_message}" }
  }

  # 解析日期时间
  date {
    match => [ "log_timestamp", "yyyy-MM-dd HH:mm:ss,SSS" ]
    target => "@timestamp"
    remove_field => ["log_timestamp"]
  }

  # 添加额外的字段
  mutate {
    add_field => {
      "service" => "%{[fields][service]}"
      "env" => "%{[fields][env]}"
      "log_topic" => "%{[fields][log_topic]}"
    }
    remove_field => ["fields"]
  }
}

output {
  elasticsearch {
    index => "logstash-%{service}-%{+YYYY-MM}"
    # ES 地址
    hosts => ["https://localhost:9200"]
    # 若开启认证需要配置用户名和密码
    user => "elastic"
    password => "eS123456_"
    
    ### 注意:证书仅支持 PEM 格式,若为其他格式需要转换
    # 若开启客户段 SSL 需配置证书
    ssl_enabled => true
    # CA 证书路径
    ssl_certificate_authorities => ["/usr/local/elasticsearch-cluster/elasticsearch/config/certs/elastic-stack-ca.pem"]
    # 客户端证书路径
    ssl_certificate => "/usr/local/elasticsearch-cluster/elasticsearch/config/certs/http/elasticsearch/http.pem"
    # 客户端密钥路径
    ssl_key => "/usr/local/elasticsearch-cluster/elasticsearch/config/certs/http/elasticsearch/key.pem"
    # 验证服务器证书
    ssl_verification_mode => "full"
  }
}
./bin/logstash

查看日志

在 Kibana 中Analytics/Discover创建一个Data View,名称为logstash-xxx_service,索引模式为logstash-xxx_service-*,timestamp 字段为@timestamp