FileBeat 是一个文本日志收集器,ElasticStack 中 Beats 数据采集产品中子产品之一。基于 Go 开发。其最大优势为配置简单、资源占用低。
https://www.elastic.co/docs/reference/beats/filebeat
https://www.elastic.co/downloads/past-releases/filebeat-8-19-3
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.19.3-linux-x86_64.tar.gz
cd /usr/local
sudo wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-8.19.3-linux-x86_64.tar.gz
sudo tar -zxvf filebeat-8.19.3-linux-x86_64.tar.gz
sudo mv filebeat-8.19.3-linux-x86_64 filebeat
sudo chown -R ubuntu:ubuntu /usr/local/filebeat
xxx_service.py
"""
xxx_service.py
模拟日志生成
"""
import logging.handlers
import random
import time
import uuid
# 配置日志格式
log_format = (
'[%(asctime)s] [%(name)s] [%(levelname)s] '
'[%(traceid)s] [%(funcName)s] [%(lineno)d] '
'[%(process)d] [%(thread)d]: %(message)s'
)
# 配置日志记录器
logger = logging.getLogger('web_logger')
logger.setLevel(logging.INFO)
# 为每条日志生成唯一的 TraceID
def generate_traceid():
return str(uuid.uuid4())
# 自定义日志记录器的过滤器,用于添加 TraceID
class TraceIDFilter(logging.Filter):
def filter(self, record):
record.traceid = generate_traceid()
return True
# 添加过滤器到日志记录器
logger.addFilter(TraceIDFilter())
# 配置日志文件处理器
file_handler = logging.handlers.TimedRotatingFileHandler(
'biz.log', when='midnight', interval=1, backupCount=7, encoding='utf-8'
)
file_handler.suffix = '%Y-%m-%d'
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(log_format))
# 配置标准输出处理器
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(logging.Formatter(log_format))
# 添加处理器到日志记录器
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
# 定义多个日志生成函数
def log_function_1():
log_message = f"GET / HTTP/1.1\" 200 612"
logger.info(log_message)
def log_function_2():
log_message = f"数据(id: 9213152)不存在"
logger.warning(log_message)
def log_function_3():
log_message = ""
try:
1 / 0
except Exception as e:
logger.error(e, exc_info=True)
# 模拟定期产生日志
def generate_log():
functions = [log_function_1, log_function_2, log_function_3]
while True:
# 随机选择一个函数调用
random_function = random.choice(functions)
random_function()
time.sleep(0.2)
if __name__ == "__main__":
generate_log()
filebeat:
inputs:
- type: log
enabled: true
paths:
- //home/ubuntu/biz.log # 替换为你的日志文件路径
fields:
service: xxx_service # 自定义字段
env: prod # 自定义字段
output:
console:
pretty: true
# kafka:
# # initial brokers for reading cluster metadata
# hosts: [ "kafka1:9092", "kafka2:9092", "kafka3:9092" ]
#
# # message topic selection + partitioning
# topic: '%{[fields.log_topic]}'
# partition.round_robin:
# reachable_only: false
#
# required_acks: 1
# compression: gzip
# max_message_bytes: 1000000
# logstash:
# hosts: [ "logstash-server:5044" ] # 替换为 Logstash 服务器的地址和端口
# 启动 filebeat 可以看到结构化后的日志信息
./filebeat -e -c filebeat.yml