Scatter-Gather 散聚模式
你在做一个商品比价系统。用户搜索「iPhone 15」,需要同时查询天猫、京东、拼多多、抖音电商等多个平台的价格,然后把结果聚合成一个列表返回给用户。每个平台的 API 响应时间是 50-200ms 不等,如果串行调用,总耗时可能超过 800ms。
但如果并行调用呢?理论上只需要 200ms——也就是最慢那个平台的响应时间。
这就是 Scatter-Gather 散聚模式的核心价值:将一个请求拆分并行到多个节点,最后将结果聚合返回。
散聚模式的流程
散聚模式包含四个核心步骤:
- 分发(Scatter):将请求拆分为多个子请求
- 并行执行(Parallel Execute):将子请求发送到对应的节点
- 收集(Collect):收集子请求的响应
- 聚合(Aggregate):将结果合并、排序、去重后返回
flowchart TB
subgraph Request["请求入口"]
Q["查询请求\niPhone 15"]
end
subgraph Scatter["分发阶段"]
D["分发器"]
end
subgraph Parallel["并行执行"]
T["天猫 API\n50ms"]
J["京东 API\n100ms"]
P["拼多多 API\n80ms"]
D["抖音 API\n200ms"]
end
subgraph Gather["收集与聚合"]
A["聚合器"]
end
subgraph Response["响应"]
R["比价结果列表"]
end
Q --> D
D -->|"查询天猫"| T
D -->|"查询京东"| J
D -->|"查询拼多多"| P
D -->|"查询抖音"| D
T --> A
J --> A
P --> A
D --> A
A --> R
广播查询:搜索引擎并行查询
在分布式搜索引擎(如 Elasticsearch)中,散聚模式用于并行查询多个索引分片。
public class ElasticsearchScatterGather {
private final RestHighLevelClient esClient;
public SearchResponse searchProducts(String query, int page, int size) {
// 1. 获取集群状态,确定分片分布
ClusterHealthResponse health = esClient.cluster().health();
int totalShards = health.getNumberOfDataNodes();
// 2. 构建分散查询请求
SearchRequest searchRequest = new SearchRequest("products_*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.multiMatchQuery(query, "name", "description"));
sourceBuilder.from(page * size);
sourceBuilder.size(size);
searchRequest.source(sourceBuilder);
// 3. 分散查询:广播到所有相关分片
// ES 自动将请求分发到各分片并行执行
CompletableFuture<SearchResponse> future = new CompletableFuture<>();
esClient.searchAsync(searchRequest, RequestOptions.DEFAULT,
new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse response) {
future.complete(response);
}
@Override
public void onFailure(Exception e) {
future.completeExceptionally(e);
}
});
try {
// 4. 等待最慢的分片响应(默认超时 30s)
SearchResponse response = future.get(30, TimeUnit.SECONDS);
// 5. ES 自动在协调节点聚合各分片结果
// 包括全局相关性评分、去重、分页等
return response;
} catch (Exception e) {
// 处理超时或部分失败
throw new SearchException("搜索失败", e);
}
}
}
Elasticsearch 的内部散聚机制
sequenceDiagram
participant Client as 客户端
participant Coordinator as 协调节点
participant Shard1 as 分片 1
participant Shard2 as 分片 2
participant Shard3 as 分片 3
Client->>Coordinator: 搜索 "iPhone"
par 并行查询
Coordinator->>Shard1: 查询分片 1
Coordinator->>Shard2: 查询分片 2
Coordinator->>Shard3: 查询分片 3
end
Shard1-->>Coordinator: 返回 Top 100 (score: 0.9)
Shard2-->>Coordinator: 返回 Top 100 (score: 0.85)
Shard3-->>Coordinator: 返回 Top 100 (score: 0.78)
Note over Coordinator: 全局排序、去重
Coordinator-->>Client: 返回聚合结果 Top 20
结果归并
超时处理
散聚模式中最关键的设计是超时处理。当部分节点响应超时或失败时,系统需要决定:是完全失败,还是返回部分结果。
public class ScatterGatherExecutor {
private final ExecutorService executor;
private final Duration timeout;
public <T, R> ScatterGatherResult<R> execute(
List<T> inputs,
Function<T, R> callable,
BiFunction<List<R>, List<Failure>, R> aggregator) {
// 为每个输入创建异步任务
List<CompletableFuture<R>> futures = inputs.stream()
.map(input -> CompletableFuture
.supplyAsync(() -> callable.apply(input), executor)
.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
// 记录失败,但不影响其他任务
log.warn("Scatter 任务失败: input={}, error={}",
input, ex.getMessage());
return null;
}))
.collect(Collectors.toList());
// 等待所有任务完成(或超时)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 收集结果和失败信息
List<R> results = new ArrayList<>();
List<Failure> failures = new ArrayList<>();
for (int i = 0; i < inputs.size(); i++) {
R result = futures.get(i).getNow(null);
if (result != null) {
results.add(result);
} else {
failures.add(new Failure(inputs.get(i), "超时或异常"));
}
}
// 使用聚合器合并结果
R aggregated = aggregator.apply(results, failures);
return ScatterGatherResult.<R>builder()
.result(aggregated)
.totalCount(inputs.size())
.successCount(results.size())
.failureCount(failures.size())
.failures(failures)
.build();
}
}
部分失败处理
public class PriceAggregator {
public List<PriceResult> aggregatePrices(List<PlatformPrice> prices, List<Failure> failures) {
// 记录失败但继续处理
if (!failures.isEmpty()) {
log.warn("部分平台查询失败: {}", failures);
}
// 按价格排序
List<PriceResult> results = prices.stream()
.map(p -> PriceResult.builder()
.platform(p.getPlatform())
.price(p.getPrice())
.url(p.getProductUrl())
.stockStatus(p.getStockStatus())
.build())
.sorted(Comparator.comparing(PriceResult::getPrice))
.collect(Collectors.toList());
// 标记不可用的平台
failures.forEach(f -> {
results.add(PriceResult.builder()
.platform("平台-" + f.getInput())
.price(null)
.status("查询失败: " + f.getReason())
.build());
});
return results;
}
}
与 MapReduce 的关系
散聚模式和 MapReduce 有相似之处,但也有明显区别:
MapReduce 可以看作是散聚模式的「批量版」:Map 阶段相当于 Scatter,Reduce 阶段相当于 Gather。区别在于 MapReduce 的数据流动是批量化的,而散聚模式是请求级的。
典型应用场景
多渠道价格比较
public class PriceComparisonService {
private final ScatterGatherExecutor executor;
public List<PriceResult> comparePrices(String productId) {
List<String> platforms = List.of("TMALL", "JD", "PDD", "DOUYIN");
return executor.execute(
platforms,
platform -> queryPlatformPrice(platform, productId),
this::aggregatePrices
).getResult();
}
private PlatformPrice queryPlatformPrice(String platform, String productId) {
// 模拟平台 API 调用
return switch (platform) {
case "TMALL" -> tmallClient.getPrice(productId);
case "JD" -> jdClient.getPrice(productId);
case "PDD" -> pddClient.getPrice(productId);
case "DOUYIN" -> douyinClient.getPrice(productId);
default -> throw new UnsupportedOperationException();
};
}
}
并行压测
public class LoadTestScatterGather {
public LoadTestReport runLoadTest(LoadTestConfig config) {
// 生成测试用例
List<EndpointTestCase> testCases = generateTestCases(config);
// 并行发送到所有压测节点
List<CompletableFuture<NodeReport>> futures = testCases.stream()
.map(this::executeOnNodes)
.flatMap(List::stream)
.collect(Collectors.toList());
// 收集所有节点报告
List<NodeReport> reports = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 聚合报告
return aggregateReports(reports);
}
}
性能优化技巧
减少节点数:不是越多越好。节点数增加意味着更多网络开销和协调成本。当节点数超过一定阈值后,收益递减。
预热和缓存:对于热门查询,可以预先将结果缓存起来,避免每次都散聚到所有节点。
优雅降级:当部分节点失败时,返回缓存结果或「部分成功」响应,而不是直接失败。
并发控制:设置最大并发数,避免瞬时请求过大压垮下游服务。
思考题
问题 1:散聚模式中,如果某个节点的响应特别慢,会拖累整体响应时间吗?
参考答案
会的。这是散聚模式的一个核心问题——整体响应时间取决于最慢的节点。解决方案包括:1)设置合理的超时时间,超时后放弃该节点,返回部分结果;2)使用 hedged requests 策略,即向多个节点发送相同请求,取先到达的结果;3)使用投机执行(speculative execution),超时后向其他节点发送相同请求;4)结果分层返回,先返回快速节点的结果,再异步更新慢节点的结果。
问题 2:散聚模式的聚合器设计有哪些常见模式?
参考答案
常见的聚合器模式包括:1)排序合并:将所有结果按某个维度排序后取 Top N;2)去重合并:使用 Set 或 Bloom Filter 去重;3)求和/统计:对数值结果求和、计数、平均;4)版本合并:类似 Git 的三路合并,处理冲突;5)分层聚合:先在每个节点本地聚合,再在协调节点聚合全局结果。选择哪种聚合器取决于业务需求和数据特点。
问题 3:什么时候不适合用散聚模式?
参考答案
不适合散聚模式的场景:1)数据量极大,超过聚合能力(如 TB 级数据);2)对一致性要求极高,需要跨节点事务;3)下游节点不稳定,失败率很高(散聚会放大失败影响);4)业务逻辑强依赖某个节点的结果。在这些场景下,可能需要考虑同步串行调用、本地缓存、或分区处理等其他方案。