浅析 Greenplum 中的 Squelch 机制

Greenplum Squelch 机制用于提前终止执行节点的执行,和 PostgreSQL 单机节点不同的是,Greenplum 无法通过停止调用函数停止节点的执行,因为 Greenplum 涉及到不同节点之间的数据传输。

1. 基于 UDP 的 Greenplum Interconnect 控制流程

在 Greenplum 中,由于数据需要跨节点传输,同时为了保证数据传输的效率和可靠性,以及 N 对 N 的数据传输特性,Interconnect 应运而生。

Interconnect 是建立的 UDP 协议之上的应用层数据传输协议,在原本不可靠 UDP 之上自行实现了可靠传输、流量控制等功能。那么为什么不直接使用 TCP 协议,而是大费周章地再去实现一个可靠的 UDP 协议呢?原因在于 TCP 协议是点对点的有状态传输协议,如果我们 N 个节点执行 RedistributeMotion 的话,那么就会产生 $N^2$ 个连接。假如当前集群有 500 个 segment 的话,那么一个 RedistributeMotion 就会有 500*500 = 250,000 个 TCP 连接,一次性建立这么多个连接是相当低效的。

一个 Motion 可以划分成 MotionSender 和 MotionReceiver,为了在 UDP 上实现可靠传输协议,那么就必然需要一个队列,来暂存那些还没有发送以及已经发送了但没有收到对方 ACK 的数据,在超时时需要对这些数据进行重传。

Alt text

如上所示,MotionSender 会维护两个队列,一个是发送队列(Send Queue),用来保存已经组装完成,即将发送的数据。另一个是未确认队列(Un ACK Queue),用来保存已经发送,但还没有收到接收者 ACK 的数据。发送端的发送流程如下:

  1. 序列化 Tuple,将其组装为数据包,并添加至发送队列中
  2. 遍历发送队列,发送数据包,并把数据包从发送队列中删除,插入到未确认队列中
  3. 读取接收端的 ACK 数据包,从未确认队列中删除已收到 ACK 的数据包

2. Greenplum Squelch (静噪) 机制

查询计划执行的过程中我们会遇到一些不需要所有的节点都执行完,计划需要提前终止的情况。 比如 Limit 节点,以及 AntiJoin 或者 SemiJoin,在到达一定条件后会终止运行,那么这个节点所在子树的所有节点都需要停止执行。Postgres 查询计划的执行由函数调用驱动,上层节点结束后不再调用下层节点的迭代器函数,下层节点自然结束。但是 在Greenplum 中,某个工作进程中间的执行过程符合函数调用驱动模型的特征,在上层节点结束执行后下层节点自然结束。但是 Greenplum 不同的 Gang 之间由 Interconnect 连接,MotionSender 是以尽力而为的方式服务 MotionReceiver 的,所以需要一种机制来通过 Interconnect 停止下层节点的运行。

所以我们加入了一个新的机制 Squelch,每个节点在中途退出时需要调用 Squelch 函数。Squelch 函数沿查询计划树向下传递,每个子节点调用自己的 Squelch 函数,并继续向下传递。大部分节点的 Squelch 函数什么都不做,少数节点的的 Squelch 函数需要简单更新状态。我们这里重点介绍 Motion 节点的 Squelch 函数。当一个节点不再需要输入元组时,Squelch 是必要而非可选的动作。及时的调用 Squelch 可以防止死锁的发生。节点的子树中可能有一个 Motion 节点,Motion 节点的接收方必须及时通知发送方不再继续读取数据。本节点在其他 Segment 上的兄弟节点可能在持续读取数据,如果这是应该发送向本节点的数据填满了发送缓冲区,则会引发死锁。通知发送方本节点不再需要数据可以让发送方跳过发送向本节点的数据而避免缓冲区阻塞。

当 Motion 发送方调用 Squelch 函数时,会发送一条标记为 UDPIC_FLAGS_STOP 的 ACK。当接收方收到这个 ACK,则向发送方返回一条带有 UDPIC_FLAGS_EOS 标记的数据包,并同时把同接收方的连接标记为不活跃。这样当发送方遇到应该发往标记为不活跃的接收方的元组时,会抛弃这个元组,这样就不会占用发送缓冲区导致死锁。同时当所有的连接全部被置为不活跃时,Motion 发送方就会停止发送元组并释放所有资源,节点结束。

3. 历史中关于 Squelch 机制的 BUG

1. 提前终止 SharedInputScan 节点导致 Hang 住

首先来看一个例子:

CREATE TABLE foo (a int, b int);
CREATE TABLE bar (c int, d int);
CREATE TABLE jazz(e int, f int);

INSERT INTO bar  VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO jazz VALUES (2, 2), (3, 3);

ANALYZE foo;
ANALYZE bar;
ANALYZE jazz;

EXPLAIN SELECT * FROM
(
    WITH cte AS (SELECT * FROM foo)
    SELECT * FROM (SELECT * FROM cte UNION ALL SELECT * FROM cte)
    AS X JOIN bar ON b = c
) AS XY JOIN jazz on c = e AND b = f;

将会得到如下查询计划:

                                                 QUERY PLAN
------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice2; segments: 3)  (cost=0.00..2155.00 rows=1 width=24)
   ->  Hash Join  (cost=0.00..2155.00 rows=1 width=24)
         Hash Cond: bar.c = jazz.e AND share0_ref2.b = jazz.f AND share0_ref2.b = jazz.e AND bar.c = jazz.f
         ->  Sequence  (cost=0.00..1724.00 rows=1 width=16)
               ->  Shared Scan (share slice:id 2:0)  (cost=0.00..431.00 rows=1 width=1)
                     ->  Materialize  (cost=0.00..431.00 rows=1 width=1)
                           ->  Table Scan on foo  (cost=0.00..431.00 rows=1 width=8)
               ->  Hash Join  (cost=0.00..1293.00 rows=1 width=16)
                     Hash Cond: share0_ref2.b = bar.c
                     ->  Redistribute Motion 3:3  (slice1; segments: 3)  (cost=0.00..862.00 rows=1 width=8)
                           Hash Key: share0_ref2.b
                           ->  Append  (cost=0.00..862.00 rows=1 width=8)
                                 ->  Shared Scan (share slice:id 1:0)  (cost=0.00..431.00 rows=1 width=8)
                                 ->  Shared Scan (share slice:id 1:0)  (cost=0.00..431.00 rows=1 width=8)
                     ->  Hash  (cost=431.00..431.00 rows=1 width=8)
                           ->  Table Scan on bar  (cost=0.00..431.00 rows=1 width=8)
         ->  Hash  (cost=431.00..431.00 rows=1 width=8)
               ->  Table Scan on jazz  (cost=0.00..431.00 rows=1 width=8)
                     Filter: e = f
 Optimizer status: PQO version 2.39.1
(20 rows)

当某个 segment 上表 jazz 没有任何数据时,HashJoin 在建立内表的哈希表时就会发现哈希表为空,那么将会直接结束整个哈希连接的执行,不再需要扫描外表,也就是会 Squelch 掉整个 Sequence 节点以及其子节点。

但是对于 Shared Scan 节点来说,会包括 writer 和 reader,其中 writer 将数据物化到文件中,reader 则只需要到指定的文件中读取即可。物化节点保证了一次扫描,多次读取的特性。基于 Greenplum 分布式的特点,reader 需要等待 writer 的完成。

那么在 Sequench 一个节点时,就有可能将 Shared Scan writer 取消掉,从而导致 reader 无限等待。

Alt text

具体修复可见 Ensure Execution of Shared Scan Writer On Squelch,修复方式其实很简单,就是在 ExecSquelchNode 碰到了跨 Slice 的物化节点时,执行该节点,而不是取消该节点:

switch (share_type)
{
	case SHARE_MATERIAL:
	case SHARE_SORT:
		return CdbVisit_Skip;

	case SHARE_MATERIAL_XSLICE:
	case SHARE_SORT_XSLICE:
		if (isWriter && !tuplestoreInitialized)
			ExecProcNode(node);     /* 继续执行该节点,而不是取消 */
		break;
	case SHARE_NOTSHARED:
		break;
}

2. 嵌套的 SharedInputScan

create table t1(a int, b int);
create table t2(a int, b int);
create table t3(a int, b int);
create table t4(a int, b int);
create table t5(a int, b int);

insert into t1 select i,i from generate_series(1, 10000)i;
insert into t2 select i,i from generate_series(1, 10000)i;

analyze;

explain (costs off) select from
(
  with cte1(a,b) as (select t1.a, t2.a from t1, t2 where t1.a = t2.b),
       cte2(a,b) as (select xx.a+2, xx.b-5 from cte1 xx, cte1 yy where xx.a = yy.a)
       select xx.a, count(1) from cte2 xx, cte2 yy where xx.a = yy.a group by xx.a union all select 1, 2 from cte1 x, cte2 y where x.a > y.b
) x (a, b)  join  t3
on  x.a = t3.a;

此时我们将会得到如下查询计划:

                                             QUERY PLAN                                              
-----------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice6; segments: 3)
   ->  Hash Join
         Hash Cond: (share1_ref3."?column?" = t3.a)
         ->  Sequence
               ->  Shared Scan (share slice:id 6:0)
                     ->  Materialize
                           ->  Hash Join
                                 Hash Cond: (t2.b = t1.a)
                                 ->  Redistribute Motion 3:3  (slice4; segments: 3)
                                       Hash Key: t2.b
                                       ->  Seq Scan on t2
                                 ->  Hash
                                       ->  Seq Scan on t1
               ->  Sequence
                     ->  Shared Scan (share slice:id 6:1)
                           ->  Materialize
                                 ->  Result
                                       ->  Hash Join
                                             Hash Cond: (share0_ref3.a = share0_ref2.a)
                                             ->  Shared Scan (share slice:id 6:0)
                                             ->  Hash
                                                   ->  Shared Scan (share slice:id 6:0)
                     ->  Append
                           ->  HashAggregate
                                 Group Key: share1_ref3."?column?"
                                 ->  Hash Join
                                       Hash Cond: (share1_ref3."?column?" = share1_ref2."?column?")
                                       ->  Redistribute Motion 3:3  (slice1; segments: 3)
                                             Hash Key: share1_ref3."?column?"
                                             ->  Result
                                                   ->  Shared Scan (share slice:id 1:1)
                                       ->  Hash
                                             ->  Redistribute Motion 3:3  (slice2; segments: 3)
                                                   Hash Key: share1_ref2."?column?"
                                                   ->  Result
                                                         ->  Shared Scan (share slice:id 2:1)
                           ->  Result
                                 ->  Result
                                       ->  Nested Loop
                                             Join Filter: (share0_ref4.a > share1_ref4."?column?_1")
                                             ->  Shared Scan (share slice:id 6:1)
                                             ->  Materialize
                                                   ->  Broadcast Motion 3:3  (slice3; segments: 3)
                                                         ->  Result
                                                               ->  Shared Scan (share slice:id 3:0)
         ->  Hash
               ->  Broadcast Motion 3:3  (slice5; segments: 3)
                     ->  Seq Scan on t3
 Optimizer: Pivotal Optimizer (GPORCA)

但是当我们执行该 SQL 时,将会得到错误:

ERROR:  cannot execute squelched plan node of type: 232 (execProcnode.c:947)  (seg0 slice6 127.0.1.1:6002 pid=92140) (execProcnode.c:947)

这个问题实际上是由前面修复物化节点所引起的,由于 Ensure Execution of Shared Scan Writer On Squelch 在碰到跨 Slice 的物化节点时将会调用 ExecProcNode(node) 执行该物化节点,那么如果在我们的查询计划中存在 SharedInputScan 的嵌套的话,也就是一个 Shared Scan Writer 的子节点中存在已经被 squelch 的 Shared Scan Reader 节点的话,就会产生如上问题。

4. Reference

smartkeyerror

日拱一卒,功不唐捐