生产实践:etcd/Consul/ZooKeeper

理论讲完了,该落地了。

共识算法的论文再优美,如果不落地,就是空中楼阁。这篇文章,我们来看生产中最常用的三种共识系统:etcdConsulZooKeeper——它们的实现差异、性能特点、适用场景,以及真实的踩坑经验。

系统概览

系统语言共识协议最早版本维护方典型用户
etcdGoRaft2013CNCF / Red HatKubernetes、CloudFoundry
ConsulGoRaft2014HashiCorpHashiCorp 全家桶
ZooKeeperJavaZAB2008ApacheHadoop、Kafka、HBase

etcd:Kubernetes 的存储后端

etcd 起源于 CoreOS(后被 Red Hat 收购),最初设计用于服务发现和配置管理。2015 年,Kubernetes 将 etcd 作为默认存储后端,一举奠定了 etcd 在云原生领域的地位。

核心特性

# etcd 配置示例
# /etc/etcd/etcd.conf
ETCD_NAME="etcd-1"
ETCD_DATA_DIR="/var/lib/etcd"
ETCD_LISTEN_PEER_URLS="http://0.0.0.0:2380"
ETCD_LISTEN_CLIENT_URLS="http://0.0.0.0:2379"

# 集群节点
ETCD_INITIAL_CLUSTER="etcd-1=http://10.0.0.1:2380,etcd-2=http://10.0.0.2:2380,etcd-3=http://10.0.0.3:2380"
ETCD_INITIAL_CLUSTER_STATE="new"

# 安全配置
ETCD_CERT_FILE="/path/to/cert.pem"
ETCD_KEY_FILE="/path/to/key.pem"
ETCD_TRUSTED_CA_FILE="/path/to/ca.pem"
ETCD_CLIENT_CERT_AUTH="true"

数据模型

etcd 使用分层 key-value 模型,支持:

  • 普通 key:简单的键值存储
  • 目录 key:以 / 结尾,表示目录
  • TTL:每个 key 可以设置过期时间
# etcdctl 命令示例
# 写入
etcdctl put /config/database '{"host":"localhost","port":5432}'

# 读取
etcdctl get /config/database

# 监视(Watch)
etcdctl watch /config --prefix

# 租约(Lease)
etcdctl lease grant 60
etcdctl put /config/temp "value" --lease=xxxxx

性能数据

指标3 节点集群5 节点集群
写 QPS~1500~1000
读 QPS(线性一致性)~5000~3000
读 QPS(过期读取)~10000+~8000+
延迟 p99(写)20~50ms30~80ms
延迟 p99(读)5~15ms10~20ms
Info

线性一致性读 vs 过期读取:etcd 默认提供线性一致性读(所有读经过 Leader 验证),但可以通过 --consistency=serializable 开启过期读取(可从 Follower 读,性能更高但不保证最新)。

Java 客户端示例

import io.etcd.jetcd.*;
import io.etcd.jetcd.kv.*;
import io.etcd.jetcd.lock.*;
import io.etcd.jetcd.lease.Lease;

import java.util.concurrent.TimeUnit;

/**
 * etcd Java 客户端使用示例
 * 使用 jetcd 库
 */
public class EtcdClientExample {

    private final Client client;
    private final KV kv;
    private final LockClient lockClient;

    public EtcdClientExample(String endpoints) {
        // 连接 etcd 集群
        this.client = Client.builder()
            .endpoints(endpoints.split(","))
            .build();
        this.kv = client.getKVClient();
        this.lockClient = client.getLockClient();
    }

    /**
     * 分布式锁实现
     */
    public LockResponse tryLock(String lockName, long ttlSeconds) throws Exception {
        // 创建租约
        Lease leaseClient = client.getLeaseClient();
        long leaseId = leaseClient.grant(ttlSeconds).get().getID();

        // 尝试获取锁
        LockOut lockOut = lockClient.lock(ByteSequence.from(lockName, StandardCharsets.UTF_8), leaseId).get();

        return new LockResponse(lockOut.getKey(), leaseId);
    }

    /**
     * 配置管理:Watch + 动态更新
     */
    public void watchConfig(String configPath, Consumer<String> onChange) {
        CountDownLatch latch = new CountDownLatch(1);

        Watch watchClient = client.getWatchClient();
        WatchListener listener = response -> {
            for (WatchEvent event : response.getEvents()) {
                switch (event.getEventType()) {
                    case PUT:
                        String newValue = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
                        onChange.accept(newValue);
                        break;
                    case DELETE:
                        onChange.accept(null); // 配置被删除
                        break;
                }
            }
        };

        watchClient.watch(
            ByteSequence.from(configPath, StandardCharsets.UTF_8),
            listener
        );
    }

    /**
     * 分布式计数器(乐观锁)
     */
    public long incrementCounter(String counterPath) throws Exception {
        while (true) {
            // 获取当前值
            TxnResponse txnResponse = kv.txn()
                .If(new Version(ByteSequence.from(counterPath, StandardCharsets.UTF_8)).isNotExist()
                    .or(new Version(ByteSequence.from(counterPath, StandardCharsets.UTF_8)).gt(leaseId -> leaseId, TxnCompare.greaterThan, -1)))
                .Then(Op.put(ByteSequence.from(counterPath, StandardCharsets.UTF_8),
                           ByteSequence.from("1", StandardCharsets.UTF_8)))
                .Else(Op.put(ByteSequence.from(counterPath, StandardCharsets.UTF_8),
                            getValue(counterPath) + 1)))
                .commit();

            if (txnResponse.isSucceeded()) {
                return 1;
            } else {
                // 失败,重试
                long current = Long.parseLong(getValue(counterPath));
                kv.put(ByteSequence.from(counterPath, StandardCharsets.UTF_8),
                       ByteSequence.from(String.valueOf(current + 1), StandardCharsets.UTF_8)).get();
                return current + 1;
            }
        }
    }

    public void close() {
        client.close();
    }
}

踩坑经验

Warning

MVCC 导致的存储膨胀:etcd 使用 MVCC(多版本并发控制),每次更新不会覆盖旧数据,而是生成新版本。这意味着频繁更新的 key 会占用大量磁盘空间。解决方案:定期compact 历史版本。

# 手动压缩
etcdctl compact 12345

# 设置自动压缩
etcd --auto-compaction=revision --compaction-interval=5m
Danger

大 Value 性能急剧下降:etcd 设计目标是存储小配置文件(如 1~10KB)。如果存储大 Value(> 1MB),性能会严重下降。解决方案:拆分为多个小 key,或使用外部对象存储(如 S3)存储大文件,只在 etcd 中存引用。

Consul:服务发现 + 配置中心

Consul 是 HashiCorp 的产品,除了 KV 存储,还提供服务发现健康检查——开箱即用,适合微服务架构。

核心特性

# Consul 配置示例
# /etc/consul.d/consul.hcl
datacenter = "dc1"
data_dir = "/opt/consul/data"
bind_addr = "{{ GetInterfaceIP \"eth0\" }}"
retry_join = ["consul-1.internal", "consul-2.internal", "consul-3.internal"]

# 性能配置
performance {
  raft_multiplier = 1  # 默认 5,降低可提升 Raft 性能
}

# 快照配置
snapshot_archive {
  path_prefix = "/opt/consul/snapshots"
}

# 自动快照(防止数据丢失)
autopilot {
  cleanup_dead_servers = true
  last_contact_threshold = "200ms"
  max_trailing_logs = 250
}

Consul vs etcd

维度Consuletcd
服务发现原生支持需额外实现
健康检查原生支持需额外实现
DNS 接口支持不支持
多数据中心原生支持需要额外配置
一致性模型线性一致线性一致
Web UI无(需第三方)
K8s 集成一般原生(官方 CSI)

Consul 典型使用场景

# 注册服务
consul services register \
  -id=web-1 \
  -name=web \
  -address=10.0.0.1 \
  -port=8080 \
  -check=http://10.0.0.1:8080/health \
  -interval=10s

# 服务发现(DNS)
dig @127.0.0.1 -p 8600 web.service.dc1.consul

# KV 存储
consul kv put config/database '{"host":"localhost"}'
consul kv get config/database

ZooKeeper:老牌分布式协调

ZooKeeper 诞生于 2008 年,是 Hadoop 生态的基石——HDFS NameNode、HBase Master、Kafka Controller 都依赖 ZooKeeper 做协调。

尽管新项目更倾向于 etcd/Consul,ZooKeeper 在遗留系统特定场景(如 Kafka 2.8 之前版本)仍有广泛应用。

数据模型

ZooKeeper 的数据模型是树形结构(类似文件系统),每个节点叫 ZNode

// ZNode 类型
public enum ZNodeType {
    PERSISTENT,       // 持久节点:创建后一直存在,直到被删除
    PERSISTENT_SEQUENTIAL,  // 持久顺序节点:节点名会自动追加序号
    EPHEMERAL,        // 临时节点:创建它的会话断开后自动删除
    EPHEMERAL_SEQUENTIAL   // 临时顺序节点
}
import org.apache.zookeeper.*;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * ZooKeeper Java 客户端示例
 */
public class ZooKeeperExample implements Watcher {

    private ZooKeeper zk;
    private final CountDownLatch connectedSignal = new CountDownLatch(1);

    public void connect(String hosts) throws Exception {
        zk = new ZooKeeper(hosts, 5000, this);
        connectedSignal.await(); // 等待连接建立
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            connectedSignal.countDown();
        }
    }

    /**
     * 分布式锁实现
     */
    public boolean acquireLock(String lockPath, int timeoutMs) throws Exception {
        String lockNode = zk.create(
            lockPath + "/lock-",
            new byte[0],
            ZooDefs.Ids.OPEN_ACL_UNSAFE,
            CreateMode.EPHEMERAL_SEQUENTIAL
        );

        // 获取所有子节点
        List<String> children = zk.getChildren(lockPath, false);
        children.sort(String::compareTo);

        int myIndex = children.indexOf(lockNode.substring(lockNode.lastIndexOf('/') + 1));

        if (myIndex == 0) {
            // 我是最小的,获得锁
            return true;
        } else {
            // 监听前一个节点
            String previousNode = children.get(myIndex - 1);

            CountDownLatch latch = new CountDownLatch(1);

            // 一次性 Watch
            Stat stat = zk.exists(lockPath + "/" + previousNode, event -> {
                if (event.getType() == Event.EventType.NodeDeleted) {
                    latch.countDown();
                }
            });

            if (stat == null) {
                // 前一个节点已消失,直接获得锁
                return true;
            }

            // 等待前一个节点删除
            return latch.await(timeoutMs, TimeUnit.MILLISECONDS);
        }
    }

    /**
     * 分布式计数器
     */
    public long incrementCounter(String path) throws Exception {
        Stat stat = zk.exists(path, false);
        if (stat == null) {
            // 不存在,创建并设为 1
            zk.create(path, "1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            return 1;
        } else {
            // 存在,乐观锁更新
            while (true) {
                int currentVersion = stat.getVersion();
                String value = new String(zk.getData(path, false, stat));
                long newValue = Long.parseLong(value) + 1;

                try {
                    zk.setData(path, String.valueOf(newValue).getBytes(), currentVersion);
                    return newValue;
                } catch (BadVersionException e) {
                    // 版本冲突,重试
                    stat = zk.exists(path, false);
                }
            }
        }
    }

    /**
     * Master 选举
     */
    public boolean tryBecomeMaster(String masterPath) throws Exception {
        try {
            zk.create(
                masterPath,
                ("master-" + zk.getSessionId()).getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL   // 临时节点
            );
            return true; // 创建成功,成为 Master
        } catch (NodeExistsException e) {
            return false; // 已被其他节点抢占
        }
    }

    public void watchMaster(String masterPath, Consumer<String> onMasterChange) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);

        Stat stat = zk.exists(masterPath, event -> {
            if (event.getType() == Event.EventType.NodeDeleted) {
                onMasterChange.accept(null);
            }
        });

        if (stat != null) {
            byte[] data = zk.getData(masterPath, false, stat);
            onMasterChange.accept(new String(data));
        }
    }

    public void close() throws InterruptedException {
        zk.close();
    }
}

ZooKeeper 的 Watch 机制

ZooKeeper 的 Watch 是一次性触发的,触发后需要重新注册。这是正确的设计——避免大量 Watch 导致的内存压力。

// Watch 使用模式
public class ConfigWatcher {

    private byte[] currentConfig;

    public void watchConfig(String configPath) throws Exception {
        // 第一次读取 + 注册 Watch
        readAndWatch(configPath);
    }

    private void readAndWatch(String path) throws Exception {
        // Watch 注册在读取操作中
        currentConfig = zk.getData(path, event -> {
            if (event.getType() == Event.EventType.NodeDataChanged) {
                // 数据变更,重新读取并注册新 Watch
                try {
                    readAndWatch(path);
                } catch (Exception e) {
                    // 处理异常
                }
            }
        }, null);
    }
}

踩坑经验

Warning

ZAB 恢复时间不可控:当 ZooKeeper Leader 故障后,新 Leader 需要同步所有 Follower 的日志。在日志很长的情况下,恢复时间可能达到分钟级别。这期间集群不可写!解决方案:定期清理快照和日志,设置 autopurge.purgeInterval

# ZooKeeper 配置
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
Danger

Session 过期导致的临时节点问题:如果 ZooKeeper 客户端与集群的网络出现短暂抖动,Session 可能过期——即使客户端实际上还活着。这会导致所有临时节点被删除。解决方案:心跳 + Session 超时设置保守值(如 30s 以上)。

选型决策矩阵

场景推荐选择备选原因
Kubernetes 存储后端etcd-官方标准
微服务配置中心ConsuletcdConsul 支持服务发现
通用 KV 存储etcdConsuletcd API 更简洁
Kafka 元数据ZooKeeperKRaft(Kafka 内置)历史原因,Kafka 3.3+ 支持内置 Raft
Hadoop 生态ZooKeeper-Hadoop 生态深度集成
新项目etcdConsul社区活跃,文档丰富
需要健康检查ConsulZooKeeperConsul 原生支持
需要 DNS 接口Consul-Consul 原生支持 DNS
多数据中心ConsuletcdConsul 多数据中心支持更成熟

运维最佳实践

监控指标

无论选择哪个系统,以下指标必须监控:

指标说明告警阈值
集群可用节点数低于多数派则不可写< 3(对于 5 节点集群)
Leader 选举频率频繁选举说明有问题> 1次/小时
写延迟 p99共识写入的端到端延迟> 100ms
磁盘使用率快照+日志会占用大量磁盘> 70%
网络带宽节点间同步的流量持续 > 80% 带宽
Follower 落后Follower 与 Leader 的日志差距> 1000 条

容量规划

# 3 节点 etcd 集群容量估算

假设:
- 每秒 1000 次写入
- 平均 Value 大小 1KB
- 保留 1 小时数据

存储需求:
- 每秒写入量 = 1000 × 1KB = 1MB
- 每小时 = 1MB × 3600 = 3.6GB
- 加上压缩开销,预留 5GB

网络需求:
- 每秒同步 = 1000 × 1KB × 2(每个写入同步 2 个 Follower)
- 节点间带宽 = 2MB/s = 16Mbps

术语表

术语英文解释
MVCCMulti-Version Concurrency Control多版本并发控制,etcd 使用
LeaseLease租约,过期后自动删除关联数据
ZNodeZooKeeper NodeZooKeeper 中的数据节点
临时节点Ephemeral Node会话断开后自动删除的节点
WatchWatchZooKeeper 的事件监听机制
TTLTime To Live过期时间
快照Snapshot状态机的完整快照
压缩Compaction清理历史版本,释放存储空间
线性一致读Linearizable Read强一致的读操作
过期读取Stale Read允许返回旧数据的读操作

延伸思考

三个系统,三种哲学。

etcd 是云原生的标准存储,它的每一行代码都在为 Kubernetes 服务。如果你在做云原生相关的工作,etcd 是默认选择。

Consul 更像瑞士军刀——除了 KV,还有服务发现、健康检查、DNS。适合微服务架构,但复杂度也更高。

ZooKeeper 是老兵,它的 API 简洁但功能有限。在新项目中,ZooKeeper 的优先级应该低于 etcd/Consul——除非你需要兼容现有系统。

最后,无论选择哪个系统,记住一个原则:共识系统的性能瓶颈往往不在共识协议本身,而在网络和磁盘 I/O。优化网络延迟、使用 SSD、合理的快照策略——这些基础设施投资,比换共识算法更有价值。