如何理解 Greenplum 中的分布式快照?

在单机的 PostgreSQL 数据库中,我们使用 xmin、xmax 以及 xip 来判断当前获取到的元组是否对我们可见。但是对于 Greenplum 这一分布式数据库而言,数据分散在不同节点上,同一个分布式事务所插入、更新的数据在不同的节点上会有不同的版本,那么此时又该如何判断元组的可见性呢?

1. 快照的基本概念

在单机 PostgreSQL 中,我们会使用本地快照来判断一个 tuple 对我们是否可见,事务快照是一个数据集合,保存了某个事务在某个特定时间点所看到的事务状态信息,包括哪些事务已经结束,哪些事务正在进行,以及哪些事务还未开始,具体内容可参考往期文章:

PostgreSQL 中的 MVCC (01)——基本可见性判断

而在分布式环境下,由于每个数据库实例都有自己的本地事务概念,因此,我们需要引入分布式事务以及分布式事务 ID 来统一构建全局的事务快照。这个快照和本地快照并没有本质的区别,仍然记录了某个事务在某个时间点所看到的的事务状态信息,只不过所记录的事务 ID 是分布式事务 ID 而已。

举个简单的例子,比如说现在有一个分布式事务 A 正在运行,它所获得的分布式快照如下吐所示:

Alt text

其中的 xmin/xmax/xip 均为分布式事务 ID。那么在判断分布式事务的可见性时,依然按照 PostgreSQL 的那套规则来判断可见性。假设所有数据都没有被删除,那么就有:

  • 所有分布式事务 ID 小于 xmin 的元组都可见;
  • 所有分布式事务 ID 大于等于 xmax 的元组都不可见;
  • 所有分布式事务 ID 在 xip 列表,并且不是当前事务所插入的元组,都不可见。

MVCC 的本质上其实就这三条规则,但是由于存在数据的删除、游标以及同一个事务对同一个 tuple 进行多次修改的情况存在,才会使得 tuple 可见性的判断变得复杂。

2. 分布式快照

Greenplum 是一个典型的 Coordinator/Segment 架构,所有的 Query 必须由协调者节点进行处理,例如对查询语句进行解析、重写和优化,再将这些信息发送给对应的 Segment 进行执行。为了保证所有 Segment 对数据具有统一的可见性,Greenplum 在在下发查询计划之前,还会额外生成分布式快照。分布式快照对应于分布式事务,用于判断元组的可见性。

分布式快照由结构体 DistributedSnapshot 定义,其定义与各字段含义如下:

typedef struct DistributedSnapshot
{
    /* 所有正在运行的事务中的最小 xminDistributedSnapshot 值 */
    DistributedTransactionId xminAllDistributedSnapshots;
    
    /* 分布式快照 ID,由 QD 中的共享变量 shmNextSnapshotId 原子性的自增得来 */
    DistributedSnapshotId distribSnapshotId;
    
    /* 下面的字段和普通快照一样,xmax/xmin/xip */
    
    DistributedTransactionId xmin;	/* XID < xmin are visible to me */
    DistributedTransactionId xmax;	/* XID >= xmax are invisible to me */
    
    /* inProgressXidArray 数组长度,即目前有多少个分布式事务正在运行 */
    int32		count;
    /* 正在运行的分布式事务数组 */
    DistributedTransactionId        *inProgressXidArray;
} DistributedSnapshot;

可以看到,分布式快照和本地快照几乎没什么区别,最核心的字段仍然是 xmin/xmax/xip 这 3 个值。当我们不考虑数据的删除时,Segment 判断一个元组对当前分布式快照的可见性流程如下:

  1. 如果元组的 xmin 所对应的事务未提交,并且不由当前快照所对应事务创建,则直接返回不可见。
  2. 否则根据创建元组的分布式事务 ID (distributedTransactionId) 与分布式快照对对比:
    • 如果 distributedTransactionId < distributedSnapshot->xmin,则对应元组可见;
    • 如果 distributedTransactionId >= distributedSnapshot->xmax,则元组不可见;
    • 如果 distributedTransactionId 在 distributedSnapshot->inProgressXidArray 数组中,则元组不可见。

XidInMVCCSnapshot() 函数为例,给出分布式快照如何在 QE 上发挥作用。从这个函数名称上我们就可以看出来是用于判断当前 tuple 的 xid 是否在给定的快照中,这里既包括分布式快照,也包括本地快照。

XidInMVCCSnapshotCheckResult
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot, 
                  bool distributedSnapshotIgnore, bool *setDistributedSnapshotIgnore) {
    *setDistributedSnapshotIgnore = false;
    
    /* 使用分布式快照进行判断 */
    if (snapshot->haveDistribSnapshot && !distributedSnapshotIgnore && !IS_QUERY_DISPATCHER()) {
        
        DistributedSnapshotCommitted	distributedSnapshotCommitted;
    
        /* 无需对特殊的 xid 进行判断,例如 FrozenXid, BootstrapXid 等 */
        if (!TransactionIdIsNormal(xid))
            return XID_NOT_IN_SNAPSHOT;
        if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
            return XID_IN_SNAPSHOT;
    
        /* DistributedSnapshotWithLocalMapping_CommittedTest 是一个关键函数 */
        distributedSnapshotCommitted =
            DistributedSnapshotWithLocalMapping_CommittedTest(
                &snapshot->distribSnapshotWithLocalMapping, xid, false);
    
        switch (distributedSnapshotCommitted) {
            case DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS:
                return XID_IN_SNAPSHOT;
    
            case DISTRIBUTEDSNAPSHOT_COMMITTED_VISIBLE:
                return XID_SURELY_COMMITTED;
    
            case DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE:
                /*
                 * We can safely skip both of these in the future for distributed
                 * snapshots.
                 */
                *setDistributedSnapshotIgnore = true;
                break;
    
            case DISTRIBUTEDSNAPSHOT_COMMITTED_UNKNOWN:
                /*
                 * The distributed log doesn't know anything about this XID. It may
                 * be a local-only transaction, or still in-progress. Proceed to
                 * perform a local visibility check.
                 */
                break;
    
            default:
                elog(FATAL, "Unrecognized distributed committed test result: %d",
                     (int) distributedSnapshotCommitted);
                break;
        }
    }
    
    /* 使用本地快照进行判断 */
    return XidInMVCCSnapshot_Local(xid, snapshot) ? XID_IN_SNAPSHOT : XID_NOT_IN_SNAPSHOT;
}

对于分布式快照而言,XidInMVCCSnapshot() 中的核心函数便是 DistributedSnapshotWithLocalMapping_CommittedTest,用于判断 tuple 对分布式快照的可见性:

DistributedSnapshotCommitted
DistributedSnapshotWithLocalMapping_CommittedTest( DistributedSnapshotWithLocalMapping *dslm, 
                                                   TransactionId localXid, bool isVacuumCheck) {
					  
    DistributedSnapshot *ds = &dslm->ds;
    uint32		i;
    DistributedTransactionId distribXid = InvalidDistributedTransactionId;
    
    Assert(!IS_QUERY_DISPATCHER());
    
    /* 当本地事务 ID < FirstNormalTransactionId 时,无需分布式快照 */
    if (!TransactionIdIsNormal(localXid))
        return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE;

    /* 首先尝试在 inProgressMappedLocalXids 数组中查找 */
    if (dslm->currentLocalXidsCount > 0) {
        /* 快速判断 */
        if (TransactionIdEquals(localXid, dslm->minCachedLocalXid) ||
            TransactionIdEquals(localXid, dslm->maxCachedLocalXid))
            return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
        
        /* 否则遍历 inProgressMappedLocalXids 数组判断 localXid 是否在其中 */
        if (TransactionIdFollows(localXid, dslm->minCachedLocalXid) &&
            TransactionIdPrecedes(localXid, dslm->maxCachedLocalXid)) {
                for (i = 0; i < dslm->currentLocalXidsCount; i++) {
                    if (TransactionIdEquals(localXid, dslm->inProgressMappedLocalXids[i]))
                        return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
                }
        }
    }

    /* localXid 能够在本地缓存中找到对应的 distributedXid,这里的本地缓存为 hashmap */
    if (LocalDistribXactCache_CommittedFind(localXid, &distribXid)) {
        if (distribXid == InvalidDistributedTransactionId)
            return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE;
    }
    else {
        /* 在本地映射文件中查找 distributedXid */
        if (DistributedLog_CommittedCheck(localXid, &distribXid)) {
            /* 添加到本地的 hashmap 缓存中 */
            LocalDistribXactCache_AddCommitted(localXid, distribXid);
        }
        else {
            /* 本地映射文件没找着对应的 distributedXid,我们也不知道发生了什么,返回 unknown */
            return DISTRIBUTEDSNAPSHOT_COMMITTED_UNKNOWN;
        }
    }
    
    /* distribXid 小于快照的最小 xmin */
    if (distribXid < ds->xminAllDistributedSnapshots)
        return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE;
    
    if (isVacuumCheck)
        return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
    
    /* distribXid < xmin,可见 */
    if (distribXid < ds->xmin)
        return DISTRIBUTEDSNAPSHOT_COMMITTED_VISIBLE;
    
    /* distribXid >= xmax,不可见 */
    if (distribXid >= ds->xmax)
        return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
    
    /* 遍历 inProgressXidArray,看看 distribXid 在不在里头儿 */
    for (i = 0; i < ds->count; i++) {
        if (distribXid == ds->inProgressXidArray[i]) {
            return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
        }
    
        if (distribXid < ds->inProgressXidArray[i])
            break;
    }
    
    /* Not in-progress, therefore visible. */
    return DISTRIBUTEDSNAPSHOT_COMMITTED_VISIBLE;
}

函数 DistributedSnapshotWithLocalMapping_CommittedTest 的整体结构非常清晰,故不再赘述,剩下的一个问题就是我们知道分布式快照是由 QD 创建的,那么 QD 是在哪个函数中创建的呢?

答案便是 DtxContextInfo_CreateOnMaster() 函数,该函数实际上是创建的 DtxContextInfo,也就是分布式事务的 Context Info,而分布式快照 Distributed Snapshot 的信息将会作为 DtxContextInfo 的一部分保存在内存中,并在需要的时候进行序列化,从而发送给 QE,如下图所示:

Alt text

可以看到,DtxContextInfo 除了获取快照信息以外,还会获取当前分布式事务的分布式事务 ID,我们简单来看一下这个函数的具体内容:

void
DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo, bool inCursor, 
                              int txnOptions, Snapshot snapshot) {
    int			i;
    CommandId	curcid = 0;
    
    if (snapshot)
        curcid = snapshot->curcid;
    
    /* reset dtxContextInfo */
    DtxContextInfo_Reset(dtxContextInfo);
    
    /* 获取当前事务的分布式事务 ID */
    dtxContextInfo->distributedXid = getDistributedTransactionId();
    if (dtxContextInfo->distributedXid != InvalidDistributedTransactionId)
        dtxContextInfo->curcid = curcid;
    
    /* 当前 session 已经执行了多少次命令 */
    dtxContextInfo->segmateSync = inCursor ? syncCount : ++syncCount;
    if (dtxContextInfo->segmateSync == (~(uint32)0))
        ereport(FATAL,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("cannot have more than 2^32-2 commands in a session")));
    
    dtxContextInfo->cursorContext = inCursor;
    dtxContextInfo->nestingLevel = GetCurrentTransactionNestLevel();
    
    dtxContextInfo->haveDistributedSnapshot = false;
    if (snapshot && snapshot->haveDistribSnapshot)
    {
        /* 填充分布式快照信息 */
        DistributedSnapshot_Copy(&dtxContextInfo->distributedSnapshot,
                                 &snapshot->distribSnapshotWithLocalMapping.ds);
        dtxContextInfo->haveDistributedSnapshot = true;
    }
    
    dtxContextInfo->distributedTxnOptions = txnOptions;
}

DtxContextInfo_CreateOnMaster() 这个函数实际上就是一个“组装车间”,将来自共享内存的分布式事务 ID、分布式快照 ID 以及分布式快照拼接起来,最终得到完整的 DtxContextInfo。序列化的工作则交给函数 qdSerializeDtxContextInfo() 完成,感兴趣的读者可自行阅读其实现。

smartkeyerror

日拱一卒,功不唐捐