在单机的 PostgreSQL 数据库中,我们使用 xmin、xmax 以及 xip 来判断当前获取到的元组是否对我们可见。但是对于 Greenplum 这一分布式数据库而言,数据分散在不同节点上,同一个分布式事务所插入、更新的数据在不同的节点上会有不同的版本,那么此时又该如何判断元组的可见性呢?
1. 快照的基本概念
在单机 PostgreSQL 中,我们会使用本地快照来判断一个 tuple 对我们是否可见,事务快照是一个数据集合,保存了某个事务在某个特定时间点所看到的事务状态信息,包括哪些事务已经结束,哪些事务正在进行,以及哪些事务还未开始,具体内容可参考往期文章:
PostgreSQL 中的 MVCC (01)——基本可见性判断
而在分布式环境下,由于每个数据库实例都有自己的本地事务概念,因此,我们需要引入分布式事务以及分布式事务 ID 来统一构建全局的事务快照。这个快照和本地快照并没有本质的区别,仍然记录了某个事务在某个时间点所看到的的事务状态信息,只不过所记录的事务 ID 是分布式事务 ID 而已。
举个简单的例子,比如说现在有一个分布式事务 A 正在运行,它所获得的分布式快照如下吐所示:
其中的 xmin/xmax/xip 均为分布式事务 ID。那么在判断分布式事务的可见性时,依然按照 PostgreSQL 的那套规则来判断可见性。假设所有数据都没有被删除,那么就有:
- 所有分布式事务 ID 小于 xmin 的元组都可见;
- 所有分布式事务 ID 大于等于 xmax 的元组都不可见;
- 所有分布式事务 ID 在 xip 列表,并且不是当前事务所插入的元组,都不可见。
MVCC 的本质上其实就这三条规则,但是由于存在数据的删除、游标以及同一个事务对同一个 tuple 进行多次修改的情况存在,才会使得 tuple 可见性的判断变得复杂。
2. 分布式快照
Greenplum 是一个典型的 Coordinator/Segment 架构,所有的 Query 必须由协调者节点进行处理,例如对查询语句进行解析、重写和优化,再将这些信息发送给对应的 Segment 进行执行。为了保证所有 Segment 对数据具有统一的可见性,Greenplum 在在下发查询计划之前,还会额外生成分布式快照。分布式快照对应于分布式事务,用于判断元组的可见性。
分布式快照由结构体 DistributedSnapshot
定义,其定义与各字段含义如下:
typedef struct DistributedSnapshot
{
/* 所有正在运行的事务中的最小 xminDistributedSnapshot 值 */
DistributedTransactionId xminAllDistributedSnapshots;
/* 分布式快照 ID,由 QD 中的共享变量 shmNextSnapshotId 原子性的自增得来 */
DistributedSnapshotId distribSnapshotId;
/* 下面的字段和普通快照一样,xmax/xmin/xip */
DistributedTransactionId xmin; /* XID < xmin are visible to me */
DistributedTransactionId xmax; /* XID >= xmax are invisible to me */
/* inProgressXidArray 数组长度,即目前有多少个分布式事务正在运行 */
int32 count;
/* 正在运行的分布式事务数组 */
DistributedTransactionId *inProgressXidArray;
} DistributedSnapshot;
可以看到,分布式快照和本地快照几乎没什么区别,最核心的字段仍然是 xmin/xmax/xip 这 3 个值。当我们不考虑数据的删除时,Segment 判断一个元组对当前分布式快照的可见性流程如下:
- 如果元组的 xmin 所对应的事务未提交,并且不由当前快照所对应事务创建,则直接返回不可见。
- 否则根据创建元组的分布式事务 ID (distributedTransactionId) 与分布式快照对对比:
- 如果 distributedTransactionId < distributedSnapshot->xmin,则对应元组可见;
- 如果 distributedTransactionId >= distributedSnapshot->xmax,则元组不可见;
- 如果 distributedTransactionId 在 distributedSnapshot->inProgressXidArray 数组中,则元组不可见。
以 XidInMVCCSnapshot()
函数为例,给出分布式快照如何在 QE 上发挥作用。从这个函数名称上我们就可以看出来是用于判断当前 tuple 的 xid 是否在给定的快照中,这里既包括分布式快照,也包括本地快照。
XidInMVCCSnapshotCheckResult
XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot,
bool distributedSnapshotIgnore, bool *setDistributedSnapshotIgnore) {
*setDistributedSnapshotIgnore = false;
/* 使用分布式快照进行判断 */
if (snapshot->haveDistribSnapshot && !distributedSnapshotIgnore && !IS_QUERY_DISPATCHER()) {
DistributedSnapshotCommitted distributedSnapshotCommitted;
/* 无需对特殊的 xid 进行判断,例如 FrozenXid, BootstrapXid 等 */
if (!TransactionIdIsNormal(xid))
return XID_NOT_IN_SNAPSHOT;
if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
return XID_IN_SNAPSHOT;
/* DistributedSnapshotWithLocalMapping_CommittedTest 是一个关键函数 */
distributedSnapshotCommitted =
DistributedSnapshotWithLocalMapping_CommittedTest(
&snapshot->distribSnapshotWithLocalMapping, xid, false);
switch (distributedSnapshotCommitted) {
case DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS:
return XID_IN_SNAPSHOT;
case DISTRIBUTEDSNAPSHOT_COMMITTED_VISIBLE:
return XID_SURELY_COMMITTED;
case DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE:
/*
* We can safely skip both of these in the future for distributed
* snapshots.
*/
*setDistributedSnapshotIgnore = true;
break;
case DISTRIBUTEDSNAPSHOT_COMMITTED_UNKNOWN:
/*
* The distributed log doesn't know anything about this XID. It may
* be a local-only transaction, or still in-progress. Proceed to
* perform a local visibility check.
*/
break;
default:
elog(FATAL, "Unrecognized distributed committed test result: %d",
(int) distributedSnapshotCommitted);
break;
}
}
/* 使用本地快照进行判断 */
return XidInMVCCSnapshot_Local(xid, snapshot) ? XID_IN_SNAPSHOT : XID_NOT_IN_SNAPSHOT;
}
对于分布式快照而言,XidInMVCCSnapshot()
中的核心函数便是 DistributedSnapshotWithLocalMapping_CommittedTest
,用于判断 tuple 对分布式快照的可见性:
DistributedSnapshotCommitted
DistributedSnapshotWithLocalMapping_CommittedTest( DistributedSnapshotWithLocalMapping *dslm,
TransactionId localXid, bool isVacuumCheck) {
DistributedSnapshot *ds = &dslm->ds;
uint32 i;
DistributedTransactionId distribXid = InvalidDistributedTransactionId;
Assert(!IS_QUERY_DISPATCHER());
/* 当本地事务 ID < FirstNormalTransactionId 时,无需分布式快照 */
if (!TransactionIdIsNormal(localXid))
return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE;
/* 首先尝试在 inProgressMappedLocalXids 数组中查找 */
if (dslm->currentLocalXidsCount > 0) {
/* 快速判断 */
if (TransactionIdEquals(localXid, dslm->minCachedLocalXid) ||
TransactionIdEquals(localXid, dslm->maxCachedLocalXid))
return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
/* 否则遍历 inProgressMappedLocalXids 数组判断 localXid 是否在其中 */
if (TransactionIdFollows(localXid, dslm->minCachedLocalXid) &&
TransactionIdPrecedes(localXid, dslm->maxCachedLocalXid)) {
for (i = 0; i < dslm->currentLocalXidsCount; i++) {
if (TransactionIdEquals(localXid, dslm->inProgressMappedLocalXids[i]))
return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
}
}
}
/* localXid 能够在本地缓存中找到对应的 distributedXid,这里的本地缓存为 hashmap */
if (LocalDistribXactCache_CommittedFind(localXid, &distribXid)) {
if (distribXid == InvalidDistributedTransactionId)
return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE;
}
else {
/* 在本地映射文件中查找 distributedXid */
if (DistributedLog_CommittedCheck(localXid, &distribXid)) {
/* 添加到本地的 hashmap 缓存中 */
LocalDistribXactCache_AddCommitted(localXid, distribXid);
}
else {
/* 本地映射文件没找着对应的 distributedXid,我们也不知道发生了什么,返回 unknown */
return DISTRIBUTEDSNAPSHOT_COMMITTED_UNKNOWN;
}
}
/* distribXid 小于快照的最小 xmin */
if (distribXid < ds->xminAllDistributedSnapshots)
return DISTRIBUTEDSNAPSHOT_COMMITTED_IGNORE;
if (isVacuumCheck)
return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
/* distribXid < xmin,可见 */
if (distribXid < ds->xmin)
return DISTRIBUTEDSNAPSHOT_COMMITTED_VISIBLE;
/* distribXid >= xmax,不可见 */
if (distribXid >= ds->xmax)
return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
/* 遍历 inProgressXidArray,看看 distribXid 在不在里头儿 */
for (i = 0; i < ds->count; i++) {
if (distribXid == ds->inProgressXidArray[i]) {
return DISTRIBUTEDSNAPSHOT_COMMITTED_INPROGRESS;
}
if (distribXid < ds->inProgressXidArray[i])
break;
}
/* Not in-progress, therefore visible. */
return DISTRIBUTEDSNAPSHOT_COMMITTED_VISIBLE;
}
函数 DistributedSnapshotWithLocalMapping_CommittedTest
的整体结构非常清晰,故不再赘述,剩下的一个问题就是我们知道分布式快照是由 QD 创建的,那么 QD 是在哪个函数中创建的呢?
答案便是 DtxContextInfo_CreateOnMaster()
函数,该函数实际上是创建的 DtxContextInfo
,也就是分布式事务的 Context Info,而分布式快照 Distributed Snapshot 的信息将会作为 DtxContextInfo
的一部分保存在内存中,并在需要的时候进行序列化,从而发送给 QE,如下图所示:
可以看到,DtxContextInfo
除了获取快照信息以外,还会获取当前分布式事务的分布式事务 ID,我们简单来看一下这个函数的具体内容:
void
DtxContextInfo_CreateOnMaster(DtxContextInfo *dtxContextInfo, bool inCursor,
int txnOptions, Snapshot snapshot) {
int i;
CommandId curcid = 0;
if (snapshot)
curcid = snapshot->curcid;
/* reset dtxContextInfo */
DtxContextInfo_Reset(dtxContextInfo);
/* 获取当前事务的分布式事务 ID */
dtxContextInfo->distributedXid = getDistributedTransactionId();
if (dtxContextInfo->distributedXid != InvalidDistributedTransactionId)
dtxContextInfo->curcid = curcid;
/* 当前 session 已经执行了多少次命令 */
dtxContextInfo->segmateSync = inCursor ? syncCount : ++syncCount;
if (dtxContextInfo->segmateSync == (~(uint32)0))
ereport(FATAL,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("cannot have more than 2^32-2 commands in a session")));
dtxContextInfo->cursorContext = inCursor;
dtxContextInfo->nestingLevel = GetCurrentTransactionNestLevel();
dtxContextInfo->haveDistributedSnapshot = false;
if (snapshot && snapshot->haveDistribSnapshot)
{
/* 填充分布式快照信息 */
DistributedSnapshot_Copy(&dtxContextInfo->distributedSnapshot,
&snapshot->distribSnapshotWithLocalMapping.ds);
dtxContextInfo->haveDistributedSnapshot = true;
}
dtxContextInfo->distributedTxnOptions = txnOptions;
}
DtxContextInfo_CreateOnMaster()
这个函数实际上就是一个“组装车间”,将来自共享内存的分布式事务 ID、分布式快照 ID 以及分布式快照拼接起来,最终得到完整的 DtxContextInfo
。序列化的工作则交给函数 qdSerializeDtxContextInfo()
完成,感兴趣的读者可自行阅读其实现。