ELK怎么写入日志的对应键值信息

发布时间:2022-02-18 17:22:38 作者:iii
来源:亿速云 阅读:186
# ELK怎么写入日志的对应键值信息

## 目录
- [1. ELK技术栈概述](#1-elk技术栈概述)
  - [1.1 Elasticsearch核心功能](#11-elasticsearch核心功能)
  - [1.2 Logstash数据处理流程](#12-logstash数据处理流程)
  - [1.3 Kibana可视化能力](#13-kibana可视化能力)
- [2. 日志键值对基础概念](#2-日志键值对基础概念)
  - [2.1 结构化日志与非结构化日志](#21-结构化日志与非结构化日志)
  - [2.2 常见键值对格式标准](#22-常见键值对格式标准)
- [3. Logstash输入配置详解](#3-logstash输入配置详解)
  - [3.1 File输入插件配置](#31-file输入插件配置)
  - [3.2 Beats输入配置实践](#32-beats输入配置实践)
- [4. 过滤器关键处理技术](#4-过滤器关键处理技术)
  - [4.1 Grok模式匹配实战](#41-grok模式匹配实战)
  - [4.2 KV过滤器深度解析](#42-kv过滤器深度解析)
  - [4.3 日期与字段转换处理](#43-日期与字段转换处理)
- [5. Elasticsearch写入优化](#5-elasticsearch写入优化)
  - [5.1 索引模板设计原则](#51-索引模板设计原则)
  - [5.2 批量写入性能调优](#52-批量写入性能调优)
- [6. 完整配置案例演示](#6-完整配置案例演示)
  - [6.1 Nginx日志处理实例](#61-nginx日志处理实例)
  - [6.2 Java应用日志解析](#62-java应用日志解析)
- [7. 常见问题解决方案](#7-常见问题解决方案)
  - [7.1 字段映射异常处理](#71-字段映射异常处理)
  - [7.2 性能瓶颈排查方法](#72-性能瓶颈排查方法)
- [8. 高级应用场景拓展](#8-高级应用场景拓展)
  - [8.1 自定义Grok模式库](#81-自定义grok模式库)
  - [8.2 多级日志嵌套处理](#82-多级日志嵌套处理)
- [9. 安全与权限控制](#9-安全与权限控制)
  - [9.1 字段级安全策略](#91-字段级安全策略)
  - [9.2 敏感信息过滤](#92-敏感信息过滤)
- [10. 未来发展趋势](#10-未来发展趋势)
  - [10.1 Elasticsearch新特性](#101-elasticsearch新特性)
  - [10.2 云原生环境适配](#102-云原生环境适配)

## 1. ELK技术栈概述

### 1.1 Elasticsearch核心功能
Elasticsearch作为分布式搜索分析引擎,提供近实时(NRT)的全文检索能力。其核心特性包括:
- 分布式文档存储:采用JSON格式存储数据
- 倒排索引机制:实现快速文本检索
- RESTful API:通过HTTP接口进行数据操作
- 分片与副本:支持水平扩展和高可用

文档基本结构示例:
```json
{
  "_index": "logstash-2023.08",
  "_type": "_doc",
  "_id": "1",
  "_source": {
    "timestamp": "2023-08-15T12:00:00.000Z",
    "message": "User login failed",
    "severity": "ERROR",
    "ip": "192.168.1.100"
  }
}

1.2 Logstash数据处理流程

Logstash数据处理包含三个阶段: 1. Input:支持50+数据源接入 - 文件日志(file) - 消息队列(kafka, rabbitmq) - 系统指标(beats)

  1. Filter:关键处理环节

    filter {
     grok {
       match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:msg}" }
     }
     kv {
       field_split => "&?"
     }
    }
    
  2. Output:多目标输出

1.3 Kibana可视化能力

Kibana提供的主要功能模块: - Discover:原始日志检索 - Visualize:图表制作 - Dashboard:综合看板 - Dev Tools:API调试控制台

典型查询DSL示例:

{
  "query": {
    "bool": {
      "must": [
        { "match": { "level": "ERROR" }},
        { "range": { "@timestamp": { "gte": "now-1h" }}}
      ]
    }
  }
}

2. 日志键值对基础概念

2.1 结构化日志与非结构化日志

类型 特征 处理难度 存储效率
非结构化日志 纯文本行
半结构化日志 包含部分键值对
结构化日志 完整JSON/XML格式

2.2 常见键值对格式标准

  1. JSON格式(现代应用首选)

    {"time":"2023-08-15T12:00:00Z","action":"login","status":"fail"}
    
  2. 键值对字符串(传统系统常见)

    time=2023-08-15T12:00:00 action=login status=fail
    
  3. Nginx访问日志(组合格式)

    192.168.1.1 - - [15/Aug/2023:12:00:00 +0800] "GET /api/user HTTP/1.1" 200 342
    
  4. CSV格式(数据库导出)

    "2023-08-15 12:00:00","login","fail"
    

3. Logstash输入配置详解

3.1 File输入插件配置

基础文件输入配置:

input {
  file {
    path => ["/var/log/nginx/*.log"]
    start_position => "beginning"
    sincedb_path => "/dev/null"
    tags => ["nginx"]
  }
}

高级参数说明: - exclude:排除特定文件模式 - close_older:自动关闭非活跃文件(单位秒) - delimiter:自定义行分隔符 - file_completed_action:文件读取完成后的操作

3.2 Beats输入配置实践

Filebeat对接配置示例:

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/pki/tls/certs/logstash.crt"
    ssl_key => "/etc/pki/tls/private/logstash.key"
  }
}

性能优化参数: - worker:工作线程数(建议CPU核数的1-2倍) - queue_size:等待处理队列大小 - congestion_threshold:背压控制阈值

4. 过滤器关键处理技术

4.1 Grok模式匹配实战

基础模式匹配:

filter {
  grok {
    match => { 
      "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:thread}\] %{LOGLEVEL:level} %{DATA:class} - %{GREEDYDATA:msg}"
    }
  }
}

自定义模式库:

# patterns/custom_patterns
USER_ID [a-zA-Z0-9]{8}-[a-zA-Z0-9]{4}

使用自定义模式:

filter {
  grok {
    patterns_dir => ["./patterns"]
    match => { "message" => "%{USER_ID:user_id} %{GREEDYDATA:action}" }
  }
}

4.2 KV过滤器深度解析

基础键值对解析:

filter {
  kv {
    source => "message"
    field_split => " "
    value_split => "="
    trim_key => "\""
    trim_value => "\""
  }
}

处理URL参数示例:

原始日志:GET /search?q=elasticsearch&size=10&from=0

配置方案:

filter {
  dissect {
    mapping => {
      "message" => "%{method} %{url}?%{query}"
    }
  }
  kv {
    source => "query"
    field_split => "&"
    target => "params"
  }
}

4.3 日期与字段转换处理

日期解析示例:

filter {
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }
}

字段类型转换:

filter {
  mutate {
    convert => {
      "response_time" => "float"
      "status_code" => "integer"
    }
  }
}

5. Elasticsearch写入优化

5.1 索引模板设计原则

生命周期管理模板:

PUT _index_template/logs-template
{
  "index_patterns": ["logs-*"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1,
      "index.lifecycle.name": "logs-policy"
    },
    "mappings": {
      "properties": {
        "@timestamp": { "type": "date" },
        "message": { "type": "text" },
        "level": { "type": "keyword" }
      }
    }
  }
}

5.2 批量写入性能调优

Logstash输出优化配置:

output {
  elasticsearch {
    hosts => ["http://es-node1:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    document_id => "%{fingerprint}"
    flush_size => 5000
    idle_flush_time => 5
    pipeline => "logstash-ingest"
  }
}

关键参数说明: - flush_size:批量提交文档数(建议2000-10000) - idle_flush_time:空闲刷新间隔(秒) - pipeline:指定Ingest Pipeline预处理

6. 完整配置案例演示

6.1 Nginx日志处理实例

input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { 
      "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{URI:referrer}|-)"|%{QS:referrer}) %{QS:agent}' 
    }
  }
  date {
    match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
  }
  geoip {
    source => "clientip"
  }
  useragent {
    source => "agent"
    target => "useragent"
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "nginx-%{+YYYY.MM.dd}"
  }
}

6.2 Java应用日志解析

filter {
  grok {
    match => { 
      "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:thread}\] %{JAVACLASS:class} - %{GREEDYDATA:logmessage}" 
    }
  }
  if [logmessage] =~ /Exception/ {
    dissect {
      mapping => {
        "logmessage" => "%{?exception}\n\tat %{stacktrace}"
      }
    }
  }
  mutate {
    remove_field => ["message"]
  }
}

7. 常见问题解决方案

7.1 字段映射异常处理

典型错误场景:

Cannot parse "192.168.1.1" as integer for field "clientip"

解决方案: 1. 检查Elasticsearch模板映射 2. 添加字段类型转换过滤器:

   filter {
     mutate {
       convert => { "response_code" => "integer" }
     }
   }
  1. 使用_mappingAPI修正已有索引:
    
    PUT my-index/_mapping
    {
     "properties": {
       "clientip": { "type": "keyword" }
     }
    }
    

7.2 性能瓶颈排查方法

性能分析矩阵:

瓶颈点 监控指标 优化方案
输入阶段 事件接收速率 增加工作线程/优化文件读取
过滤阶段 过滤器执行时间 简化Grok模式/启用缓存
输出阶段 批量提交延迟 调整flush_size和flush_interval
Elasticsearch 索引速率/bulk拒绝数 增加节点/优化分片策略

监控API端点:

GET _nodes/stats
GET _cat/thread_pool?v
GET _cluster/pending_tasks

8. 高级应用场景拓展

8.1 自定义Grok模式库

企业级模式库管理: 1. 创建共享模式目录:

   /etc/logstash/patterns/
   ├── network.patterns
   ├── application.patterns
   └── system.patterns
  1. 网络设备日志模式示例:

    CISCO_TACACS %{IP:client_ip} %{INT:src_port} -> %{IP:server_ip}/%{INT:dest_port}
    
  2. 动态加载配置:

    filter {
     grok {
       patterns_dir => ["/etc/logstash/patterns"]
       match => { "message" => "%{CISCO_TACACS:auth_attempt}" }
     }
    }
    

8.2 多级日志嵌套处理

JSON嵌套字段处理:

filter {
  json {
    source => "message"
    target => "payload"
  }
  if [payload][request] {
    json {
      source => "[payload][request]"
      target => "[payload][parsed_request]"
    }
  }
}

9. 安全与权限控制

9.1 字段级安全策略

基于角色的字段过滤:

PUT _security/role/logs_reader
{
  "indices": [
    {
      "names": ["logs-*"],
      "privileges": ["read"],
      "field_security": {
        "grant": ["@timestamp", "level", "message"],
        "except": ["password", "credit_card"]
      }
    }
  ]
}

9.2 敏感信息过滤

日志脱敏处理:

filter {
  mutate {
    gsub => [
      "message", "(\d{4})\d{8}(\d{4})", "\1********\2", # 银行卡号
      "message", "(?i)password[=:]\s*\K\S+", "[REDACTED]"
    ]
  }
  fingerprint {
    source => ["user_ip", "@timestamp"]
    target => "event_id"
    method => "SHA256"
  }
}

10. 未来发展趋势

10.1 Elasticsearch新特性

10.2 云原生环境适配

  1. 容器日志收集方案:

    input {
     docker {
       host => "unix:///var/run/docker.sock"
       type => "docker"
     }
    }
    
  2. Kubernetes场景优化:

    • 使用Filebeat DaemonSet收集节点日志
    • 通过annotations实现动态解析配置
    annotations:
     co.elastic.logs/module: "nginx"
     co.elastic.logs/fileset.stdout: "access"
    

本文详细探讨了ELK技术栈中日志键值信息的处理全流程,从基础概念到高级应用场景,涵盖了配置示例、性能优化和疑难解答等内容。实际部署时建议: 1. 先进行小规模日志样本测试 2. 逐步完善Grok模式和字段映射 3. 建立完善的监控告警机制 4. 定期评审索引生命周期策略 “`

推荐阅读:
  1. ELK日志收集demo
  2. 日志分析ELK平台部署

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

elk

上一篇:whereis命令使用实例有哪些

下一篇:Zabbix怎么安装配置

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》