Kubernetes 审计日志

某公司发生了数据泄露事件。调查显示,攻击者在两周前就获得了集群的访问权限,并在事故发生前进行了多次侦察。但直到数据已经被拖走,安全团队才收到告警。

问题在于没有审计日志。他们不知道谁在什么时间访问了什么资源,做了什么操作。没有日志,安全事件就无法被追溯和调查。

审计日志是安全运营的基础,也是合规要求的必备项。

K8s 审计日志的作用

Kubernetes 审计日志记录所有对 API Server 的请求,包括谁(Who)什么时候(When)从哪里(From)做了什么(What)操作。

安全合规:满足 SOC 2、ISO 27001、PCI DSS 等合规要求的必要条件。

事件追溯:发生安全事件后,通过审计日志还原攻击链和影响范围。

异常检测:识别可疑的操作模式,如非工作时间的访问、异常的 API 调用量。

操作审计:了解资源变更历史,支持故障排查和变更管理。

审计策略配置

审计策略定义了记录哪些事件、记录多少信息。

策略结构

审计策略配置
apiVersion: audit.k8s.io/v1
kind: AuditPolicy
metadata:
  name: production-policy
rules:
  # 规则按顺序匹配,第一个匹配的规则生效
  - level: NoResources
    users: ["system:kube-proxy"]
    verbs: ["get"]
    resources:
      - group: ""
        resources: ["endpoints"]
  
  - level: Metadata
    resources:
      - group: ""
        resources: ["pods"]
      - group: ""
        resources: ["secrets"]
        namespaces: ["production"]
  
  - level: RequestResponse
    resources:
      - group: ""
        resources: ["secrets"]
  
  - level: None
    users: ["system:apiserver"]
    verbs: ["get"]

审计级别

级别记录内容适用场景
None不记录大量低价值请求
Metadata用户、时间、URI、响应状态大多数资源
RequestMetadata + 请求体需要请求内容的场景
RequestResponseMetadata + 请求体 + 响应体关键资源(如 Secret)

API Server 配置

API
apiVersion: apiserver.config.k8s.io/v1
kind: AuditConfiguration
spec:
  level: Metadata
  stages:
    - RequestReceived
    - ResponseStarted
    - ResponseComplete
  
  rules:
    - level: RequestResponse
      resources:
        - group: ""
          resources: ["secrets"]
        - group: ""
          resources: ["serviceaccounts/token"]
  
  backend:
    - type: Webhook
      webhook:
        server: https://audit-backend.example.com
        tlsConfig:
          caBundle: <base64-encoded-ca>

审计日志记录的字段

每个审计日志条目包含以下字段:

审计日志条目示例
{
  "kind": "Event",
  "apiVersion": "audit.k8s.io/v1",
  "level": "RequestResponse",
  "timestamp": "2024-01-15T10:30:00Z",
  "auditID": "abc123-def456",
  "stage": "ResponseComplete",
  "requestURI": "/api/v1/namespaces/production/secrets/db-credentials",
  "verb": "get",
  "user": {
    "username": "alice@example.com",
    "uid": "ldap-user-001",
    "groups": ["developers", "production-team"]
  },
  "sourceIPs": ["10.0.0.50"],
  "userAgent": "kubectl/v1.28.0",
  "objectRef": {
    "resource": "secrets",
    "namespace": "production",
    "name": "db-credentials",
    "apiVersion": "v1"
  },
  "responseStatus": {
    "code": 200,
    "message": "OK"
  },
  "requestObject": null,
  "responseObject": {
    "apiVersion": "v1",
    "kind": "Secret"
  },
  "requestReceivedTimestamp": "2024-01-15T10:30:00Z",
  "stageTimestamp": "2024-01-15T10:30:01Z"
}

关键字段说明

字段说明安全意义
user.username执行操作的用户追踪操作者
sourceIPs请求来源 IP定位攻击源
objectRef操作的资源对象了解受影响范围
responseStatus.code响应状态码检测失败的攻击尝试
verb操作类型(get/list/create/delete)识别异常操作模式

敏感操作的监控

高风险操作列表

敏感操作监控规则
# 需要重点监控的操作
sensitive_operations:
  - verb: delete
    resource: secrets
  
  - verb: create
    resource: pods
  
  - verb: create
    resource: deployments
  
  - verb: patch
    resource: rolebindings
  
  - verb: create
    subresource: exec
  
  - verb: create
    subresource: attach

Secret 访问监控

Secret
# Falco 规则 - Secret 访问检测
- rule: Secret Access from Production
  desc: Detect secret access in production namespace
  condition: >
    kaudit and
    kaudit.type == "RequestReceived" and
    kaudit.annotation.kubernetes.io/审计操作 in (get,list) and
    kaudit.objectRef.resource == "secrets" and
    kaudit.objectRef.namespace == "production" and
    not kaudit.user.username startswith "system:"
  output: >
    Secret accessed in production
    (user=%kaudit.user.username source=%kaudit.sourceIps[0]
    secret=%kaudit.objectRef.name)
  priority: HIGH

异常时间访问检测

异常时间访问检测
import pandas as pd
from datetime import datetime

def detect_after_hours_access(audit_logs: list) -> list:
    """检测非工作时间的异常访问"""
    alerts = []
    
    for log in audit_logs:
        timestamp = datetime.fromisoformat(log['timestamp'])
        
        # 定义工作时间(假设 UTC+8)
        is_work_hours = 9 <= timestamp.hour <= 18
        is_workday = timestamp.weekday() < 5
        
        if not (is_work_hours and is_workday):
            # 非工作时间访问敏感资源
            if log.get('objectRef', {}).get('resource') == 'secrets':
                alerts.append({
                    'timestamp': timestamp,
                    'user': log['user']['username'],
                    'resource': log['objectRef'],
                    'type': 'after_hours_secret_access'
                })
    
    return alerts

审计日志的存储与分析

后端配置

Kubernetes 支持多种审计日志后端:

Webhook
# 发送到外部日志系统
apiVersion: audit.k8s.io/v1
kind: AuditSink
metadata:
  name: elasticsearch
spec:
  backend:
    webhook:
      server: https://elasticsearch:9200
      clientConfig:
        caBundle: <ca>

Elasticsearch 集成

Filebeat
apiVersion: beat.k8s.elastic.co/v1beta1
kind: Beat
metadata:
  name: kube-audit-beat
spec:
  type: filebeat
  version: 8.10.0
  elasticsearchRef:
    clusterID: production
  config:
    filebeat.inputs:
      - type: log
        paths:
          - /var/log/kubernetes/audit.log
        json.keys_under_root: true
    processors:
      - add_kubernetes_metadata:
          host: ${NODE_NAME}
          matchers:
            - logs_path:
                logs_path: /var/log/kubernetes/

Kibana 仪表板

通过 Kibana 可以可视化审计日志:

  • 资源访问频率热力图
  • 用户操作分布
  • 异常访问模式检测
  • 合规报告生成

Falco 配置审计日志

Falco 通过动态类型(kaudit)支持审计日志分析。

Falco
# 检测特权容器创建
- rule: Privileged Container Created
  desc: A privileged container was created in kube-system
  condition: >
    kaudit and
    kaudit.type == "RequestReceived" and
    kaudit.verb == "create" and
    kaudit.objectRef.resource == "pods" and
    kaudit.objectRef.namespace == "kube-system" and
    json_contains(kaudit.requestObject.spec, "privileged": true)
  output: >
    Privileged container created
    (user=%kaudit.user.username namespace=%kaudit.objectRef.namespace
    pod=%kaudit.objectRef.name)
  priority: CRITICAL

# 检测 RBAC 变更
- rule: RBAC Changed
  desc: ClusterRole or Role was modified
  condition: >
    kaudit and
    kaudit.type == "RequestReceived" and
    kaudit.verb in (create, update, patch) and
    kaudit.objectRef.resource in (clusterroles, roles, clusterrolebindings, rolebindings)
  output: >
    RBAC resource modified
    (user=%kaudit.user.username resource=%kaudit.objectRef.resource
    name=%kaudit.objectRef.name)
  priority: HIGH

异常行为检测

检测模式

暴力破解检测:短时间内多次登录失败。

暴力破解检测
def detect_brute_force(audit_logs: list, threshold: int = 5, window_minutes: int = 10) -> list:
    """检测暴力破解尝试"""
    login_failures = {}
    alerts = []
    
    for log in audit_logs:
        if log.get('responseStatus', {}).get('code') == 401:
            user = log['user']['username']
            timestamp = datetime.fromisoformat(log['timestamp'])
            
            login_failures.setdefault(user, []).append(timestamp)
            
            # 检查是否超过阈值
            recent_failures = [
                t for t in login_failures[user]
                if (timestamp - t).total_seconds() < window_minutes * 60
            ]
            
            if len(recent_failures) >= threshold:
                alerts.append({
                    'user': user,
                    'attempts': len(recent_failures),
                    'first_attempt': min(recent_failures),
                    'last_attempt': timestamp,
                    'type': 'brute_force'
                })
                login_failures[user] = []  # 重置计数
    
    return alerts

权限提升检测:用户开始访问之前未使用的资源。

权限提升检测
def detect_privilege_escalation(audit_logs: list) -> list:
    """检测权限提升"""
    user_resources = {}
    alerts = []
    
    for log in audit_logs:
        user = log['user']['username']
        resource = f"{log['objectRef']['resource']}"
        
        user_resources.setdefault(user, set()).add(resource)
    
    # 检测对敏感资源的新访问
    for user, resources in user_resources.items():
        if 'secrets' in resources and 'clusterroles' in resources:
            alerts.append({
                'user': user,
                'resources': list(resources),
                'type': 'privilege_escalation_suspicious'
            })
    
    return alerts

审计日志的合规要求

合规标准审计要求
SOC 2记录所有数据访问操作
ISO 27001记录所有安全相关事件
PCI DSS记录所有系统组件的访问
HIPAA记录 PHI 数据访问
合规建议

建议将 Kubernetes 审计日志与现有的 SIEM 系统集成,实现统一的日志管理和告警。审计日志的保留期限应满足合规要求,通常至少 90 天。

总结与延伸思考

Kubernetes 审计日志是安全运营的基础设施。没有审计日志,安全事件无法追溯,合规无法证明。

配置审计日志时需要平衡两个因素:记录范围(太多日志难以分析)和安全覆盖(太少日志可能有盲区)。

建议从关键资源(Secret、RBAC、ServiceAccount)开始,逐步扩大覆盖范围。

思考题

问题 1:为什么说「审计日志只记录失败的操作就够」是一个危险假设?

参考答案

因为成功的操作同样需要审计。攻击者获得合法凭证后,成功的操作才是真正危险的。审计日志的核心目的是:1)记录正常操作基线,用于异常检测;2)追溯已发生的事件;3)证明合规。失败的操作是异常检测的辅助信号,不是审计的核心。

问题 2:如何设计一个既满足合规要求又不产生过多日志的审计策略?

参考答案

采用分层策略:1)敏感资源(Secret、RBAC)使用 RequestResponse 级别;2)一般资源(Pods、Services)使用 Metadata 级别;3)只读操作使用 None 级别或 Metadata 级别;4)高风险操作(delete、patch、exec)无论资源类型都提升级别。同时设置日志保留策略,将详细日志发送到冷存储,超过 30 天后删除原始数据,只保留聚合统计信息。