-
步骤1 规范日志格式
在 Nginx 中定义清晰、可解析的日志格式,并尽量只记录必要字段;必要时为静态资源关闭访问日志以减少噪声。示例:
log_format main ‘$remote_addr - $remote_user [$time_local] “$request” $status $body_bytes_sent “$http_referer” “$http_user_agent” $request_time’;
access_log /var/log/nginx/access.log main;
location ~* .(jpg|css|js|ico)$ { access_log off; }
说明:combined 与 main 是常用格式,后者便于加入**$request_time**等性能指标。
-
步骤2 命令行快速清洗与统计
示例(按常见 combined 格式字段位置):
- 过滤非法 IP:awk ‘$1 ~ /^[0-9]+.[0-9]+.[0-9]+.[0-9]+$/ {print}’ access.log
- Top 10 IP:awk ‘{print $1}’ access.log | sort | uniq -c | sort -nr | head
- 状态码分布:awk ‘{print $9}’ access.log | sort | uniq -c | sort -nr
- 实时可视化:goaccess access.log --log-format=COMBINED
说明:字段位置会随 log_format 变化,需与实际格式对齐。
-
步骤3 使用 Vector 进行结构化清洗与落库
思路:file source → remap transform(VRL 正则解析)→ ClickHouse sink。示例要点:
- 源配置:读取 access.log,支持从文件尾部增量读取。
- 解析示例(VRL):
parsed = parse_regex(.message, r’^(?P\S+) - \S+ [(?P<time_local>[^]]+)] “(?P\S+) (?P\S+) HTTP/\d.\d” (?P\d+) (?P\d+) “(?P[^”])" “(?P[^”])"')
.time = parse_timestamp!(parsed.time_local, format: “%d/%b/%Y:%H:%M:%S %z”)
.status = to_uint!(parsed.status)
.size = to_uint!(parsed.size)
del(.message, .time_local)
- 目的端:ClickHouse 建表(如 ip String、time DateTime、url String、status UInt16、size UInt32、ua String 等),Vector 直接写入。
说明:Vector 的VRL适合做字段抽取、类型转换与轻量清洗,便于快速落地。
-
步骤4 使用 PySpark 做离线大数据清洗
思路:读取文本 → 正则抽取 → 类型转换与校验 → 写出 Parquet/ORC。示例要点:
- 抽取:
df = spark.read.text(“access.log”)
log_pattern = r’(\S+) - \S+ [(.?)] "(.?)" (\d+) (\d+) “(.?)" "(.?)”’
cleaned = df.select(
regexp_extract(“value”, log_pattern, 1).alias(“ip”),
regexp_extract(“value”, log_pattern, 2).alias(“time_local”),
regexp_extract(“value”, log_pattern, 3).alias(“request”),
regexp_extract(“value”, log_pattern, 4).alias(“status”),
regexp_extract(“value”, log_pattern, 5).alias(“size”),
regexp_extract(“value”, log_pattern, 6).alias(“referer”),
regexp_extract(“value”, log_pattern, 7).alias(“ua”)
)
- 清洗与转换:
cleaned = cleaned.filter((col(“status”).isNotNull()) & (col(“status”) >= 200) & (col(“status”) <= 599))
cleaned = cleaned.withColumn(“time”, to_timestamp(col(“time_local”), “dd/MMM/yyyy:HH:mm:ss Z”))
- 写出:cleaned.write.parquet(“hdfs://…/nginx-cleaned/”, mode=“overwrite”)
说明:适合TB级历史数据治理、复杂转换与分区存储。