ELK Stack 架构深度解析

ELK 是 Elasticsearch + Logstash + Kibana 的组合,是日志系统领域最经典的开源方案。Elasticsearch 负责存储和搜索,Logstash 负责数据处理和转换,Kibana 负责可视化。

但 ELK 并不是唯一的选择——对于中小规模的日志系统,Loki 可能是更轻量的替代。对于大规模系统,直接使用 Elasticsearch + Beats(替代 Logstash)可能更高效。理解 ELK 的架构,才能判断什么时候该用它。

ELK 架构全景

flowchart TB
    subgraph Collection["日志采集"]
        App["应用日志<br/>(JSON 格式)"]
        Filebeat["Filebeat<br/>(轻量采集)"]
        Fluentd["Fluentd"]
    end

    subgraph Pipeline["日志处理"]
        Logstash["Logstash<br/>(可选,用于复杂处理)"]
        Kafka["Kafka<br/>(可选,作为缓冲)"]
    end

    subgraph Storage["存储与搜索"]
        ES["Elasticsearch<br/>(分布式搜索引擎)"]
    end

    subgraph Query["查询与可视化"]
        Kibana["Kibana<br/>(可视化界面)"]
        API["REST API"]
    end

    App --> Filebeat
    Filebeat --> Kafka
    Kafka --> Logstash
    Logstash --> ES
    App --> Fluentd
    Fluentd --> ES
    ES --> Kibana
    ES --> API

Elasticsearch:分布式搜索引擎

核心概念

概念对应关系型数据库说明
IndexDatabase索引,逻辑上的数据库
DocumentRow文档,一条日志
FieldColumn字段,日志的属性
MappingSchema映射,字段的类型定义
Shard-分片,数据分片
Replica-副本,数据副本

日志存储设计

日志索引映射
{
  "mappings": {
    "properties": {
      "@timestamp": { "type": "date" },
      "level": { "type": "keyword" },
      "service": { "type": "keyword" },
      "traceId": { "type": "keyword" },
      "spanId": { "type": "keyword" },
      "message": { "type": "text" },
      "logger": { "type": "keyword" },
      "thread": { "type": "keyword" },
      "host": { "type": "keyword" },
      "error": {
        "type": "object",
        "properties": {
          "message": { "type": "text" },
          "stack_trace": { "type": "text" }
        }
      },
      "context": {
        "type": "object",
        "dynamic": true
      }
    }
  },
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "index.lifecycle.name": "log-retention-policy"
  }
}

关键字段类型选择

{
  "mappings": {
    "properties": {
      // keyword:精确匹配,不需要分词,用于过滤和聚合
      "service": { "type": "keyword" },
      "level": { "type": "keyword" },
      "traceId": { "type": "keyword" },

      // text:全文搜索,用于 message 字段的内容搜索
      "message": { "type": "text" },

      // date:时间类型,用于时间范围查询
      "@timestamp": { "type": "date" },

      // long/double:数值类型,用于范围查询和统计
      "duration_ms": { "type": "long" },

      // object:嵌套对象
      "context": { "type": "object", "dynamic": true }
    }
  }
}

Filebeat:轻量采集

Filebeat 是 Beats 家族的日志采集器,比 Logstash 更轻量,适合将日志直接发送到 Elasticsearch 或 Logstash。

filebeat.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/containers/*.log
    json:
      keys_under_root: true
      add_error_key: true
      message_key: message

    # 字段添加
    fields:
      environment: production
      cluster: main
    fields_under_root: true

  # 过滤容器日志(只采集应用日志)
  - type: docker
    containers.ids:
      - '*'
    processors:
      - add_kubernetes_metadata:
          host: ${NODE_NAME}
          matchers:
            - logs_path:
                logs_path: "/var/log/containers/"
    json:
      keys_under_root: true
      add_error_key: true

processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "logs-%{+yyyy.MM.dd}"

# 启用索引生命周期管理
setup.ilm.enabled: true
setup.ilm.rollover_alias: "logs"
setup.ilm.pattern: "{now/d}-000001"
setup.ilm.policy_name: "log-retention-policy"

Logstash:数据处理管道

Logstash 适合需要对日志做复杂转换的场景。

logstash-pipeline.conf
input {
  beats {
    port => 5044
  }

  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["app-logs"]
    group_id => "logstash"
    codec => json
  }
}

filter {
  # 时间解析
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }

  # JSON 解析
  if [message] =~ /^\{/ {
    json {
      source => "message"
      target => "parsed"
    }

    # 将解析后的字段提升到顶层
    mutate {
      rename => {
        "[parsed][traceId]" => "traceId"
        "[parsed][service]" => "service"
        "[parsed][level]" => "level"
        "[parsed][message]" => "message"
      }
    }
  }

  # Grok 解析非结构化日志
  if [level] != "INFO" or [message] =~ /ERROR|WARN/ {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} \[%{DATA:logger}\] %{GREEDYDATA:message}"
      }
      overwrite => ["message"]
    }
  }

  # 过滤敏感信息
  mutate {
    gsub => [
      "message", "password=[^&\s]+", "password=***",
      "token=[^&\s]+", "token=***"
    ]
  }

  # 添加地理信息
  geoip {
    source => "client_ip"
    target => "geoip"
  }

  # 用户代理解析
  user_agent {
    source => "user_agent"
    target => "ua"
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    document_type => "_doc"

    # ILM 集成
    ilm_enabled => true
    ilm_rollover_alias => "logs"
    ilm_pattern => "{now/d}-000001"
    ilm_policy => "log-retention-policy"
  }
}

Kibana:可视化

日志查询

# 基础查询
level: ERROR AND service: order-service

# 时间范围 + 错误
@timestamp: ["2026-04-08T00:00:00Z" TO "2026-04-08T23:59:59Z"]
AND level: ERROR

# TraceID 关联
traceId: "d3f8a2c1-e4b7-4f92"

# JSON 字段查询
context.orderId: 884321

# 聚合分析
# 统计每小时的错误数量
GET /logs/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        { "range": { "@timestamp": { "gte": "now-24h" } } },
        { "term": { "level": "ERROR" } }
      ]
    }
  },
  "aggs": {
    "errors_over_time": {
      "date_histogram": {
        "field": "@timestamp",
        "fixed_interval": "1h"
      }
    }
  }
}

索引生命周期管理(ILM)

ILM
PUT /_ilm/policy/log-retention-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_age": "1d",
            "max_primary_shard_size": "50gb"
          },
          "set_priority": 100
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {
            "number_of_shards": 1
          },
          "forcemerge": {
            "max_num_segments": 1
          },
          "set_priority": 50
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "set_priority": 0,
          "allocate": {
            "require": {
              "data": "cold"
            }
          }
        }
      },
      "delete": {
        "min_age": "90d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

性能调优

常见性能问题

# 问题:GC 导致写入阻塞
# 解决:增加 ES 堆内存或减少批量写入大小

# 问题:查询慢
# 原因:缺少 Filter Cache、Mapping 不合理
# 解决:使用 keyword 而非 text 做精确过滤

# 问题:存储成本高
# 解决:启用 ILM + 冷热分层

质量判断标准

读完本节后,你应该能够回答:

  1. ELK Stack 中,Elasticsearch、Logstash、Filebeat 各自承担什么职责?什么场景下可以省略 Logstash?
  2. Elasticsearch 的 keywordtext 字段类型有什么区别?为什么 TraceID 应该用 keyword 而非 text
  3. 索引生命周期管理(ILM)的热/温/冷/删四阶段分别是什么?为什么这样设计?
  4. Filebeat 和 Logstash 的取舍标准是什么?在什么规模下需要引入 Logstash?
  5. ELK 的查询性能瓶颈通常来自哪里?应该如何从索引设计和查询层面进行优化?