Apache Omid TSO 组件实现原理

作用

独立进程,处理全局事务之间的并发冲突。

流程

TSOChannelHandler#channelRead -> AbstractRequestProcessor -> PersistenceProcessorHandler

总体流程

1
2
3
4
5
6
7
8
9
thread1
TSOChannelHandler#channelRead
AbstractRequestProcessor#timestampRequest 接收 client 请求,创建 RequestEvent 并 publish
thread2
AbstractRequestProcessor#onEvent 处理 RequestEvent 请求
AbstractRequestProcessor#handleRequest
PersistenceProcessorImpl#addTimestampToBatch 创建 PersistEvent,当 batch 满了发送事件
thread3
PersistenceProcessorHandler#onEvent 持久化事件处理

TSOChannelHandler

继承自 Netty 的 ChannelInboundHandlerAdapter,用于处理 TSO 的入站请求

channelRead

委托 requestProcessor 创建 timestampRequest 和 commitRequest 请求事件。

AbstractRequestProcessor

处理 timestamp 和 commit 事件。

onEvent

处理 RequestEvent 事件,按照事件类型派发给 handleTimestamp 和 handleCommit 方法进行处理。

handleTimestamp

1.通过 timestampOracle 获取下一个时间戳;
2.PersistenceProcessorImpl#addBatch 事件添加到 batch,但是后续对 timestamp 请求不会额外处理。

handleCommit

主要通过 hasConflictsWithCommittedTransactions 判断 writeSet 和 CommitHashMap 里是否有事务写冲突,如果没有则可以提交事务,分配 commitTimestamp。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void handleCommit(RequestEvent event) throws Exception {
long startTimestamp = event.getStartTimestamp(); // startTimestamp
Iterable<Long> writeSet = event.writeSet(); // 写入集,存储的是 cellIds
Collection<Long> tableIdSet = event.getTableIdSet();
boolean isCommitRetry = event.isCommitRetry();

boolean nonEmptyWriteSet = writeSet.iterator().hasNext(); // 检查写集合是否为空,即事务是否有写操作
if (startTimestamp > lowWatermark &&
!hasConflictsWithFences(startTimestamp, tableIdSet) &&
!hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) { // 检查事务是否满足提交条件,通过 hasConflictsWithCommittedTransactions 判断是否有事务写冲突
// 可以进行事务提交
long commitTimestamp = timestampOracle.next(); // 获取提交时间戳
Optional<Long> forwardNewWaterMark = Optional.absent();
if (nonEmptyWriteSet) { // 写集合非空
long newLowWatermark = lowWatermark;

for (long r : writeSet) { // 遍历写集合中的每个元素,更新其最新的写入时间戳,并计算新的低水位线
long removed = hashmap.putLatestWriteForCell(r, commitTimestamp); // 更新 cellId 对应的 commitTimestamp, 返回之前的 oldest commitTimestamp
newLowWatermark = Math.max(removed, newLowWatermark); // 更新低水位线
}

if (newLowWatermark != lowWatermark) { // 更新低水位线
lowWatermark = newLowWatermark;
forwardNewWaterMark = Optional.of(lowWatermark);
}
}
forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark); // 持久化 commit 请求
} else { // 事务不满足提交条件
if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
forwardCommitRetry(startTimestamp, c, event.getMonCtx()); // 若是提交重试,再次检查是否已提交以避免因响应延迟导致的重复提交
} else {
forwardAbort(startTimestamp, c, event.getMonCtx()); // 否则,中止事务
}
}
}

CommitHashMap

通过 LongCache 缓存 cellId -> lastCommittedTimestamp 的映射。

getLatestWriteForCell 方法:
根据 cellId 获取 lastCommittedTimestamp。

putLatestWriteForCell 方法:
更新 cellId 对应的 lastCommittedTimestamp。

LongCache

缓存 cellId -> lastCommittedTimestamp 的映射。

get 和 set 操作都是先将原始 cellId 进行 hash 操作找到位置,所以可能存在冲突。

set

更新 cellId 对应的 lastCommittedTimestamp。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public long set(long key, long value) {
final int index = index(key); // cellId 取模返回下标,可能会冲突
int oldestIndex = 0;
long oldestValue = Long.MAX_VALUE;
for (int i = 0; i < associativity; ++i) {
int currIndex = 2 * (index + i); // 计算 key 下标
if (cache[currIndex] == key) { // 相同事务 cellId, 替换场景
oldestValue = 0;
oldestIndex = currIndex;
break;
}
if (cache[currIndex + 1] <= oldestValue) { // 没找到相同的key.通过和 oldestValue 比较会将最小的 timestamp 剔除
oldestValue = cache[currIndex + 1];
oldestIndex = currIndex;
}
}
// 替换最旧的键值对,将其更新为新的键值对
cache[oldestIndex] = key;
cache[oldestIndex + 1] = value;
return oldestValue;
}

get

获取 cellId 对应的 lastCommittedTimestamp,找不到则返回 0.

1
2
3
4
5
6
7
8
9
10
public long get(long key) {
final int index = index(key);
for (int i = 0; i < associativity; ++i) { // associativity 里存储的元素key应该是相同的
int currIndex = 2 * (index + i); // 计算 key 的下标
if (cache[currIndex] == key) { // 找到 cache key
return cache[currIndex + 1]; // 返回对应的 value
}
}
return 0;
}

PersistenceProcessorImpl

将 startTimestamp 和 commitTimestamp 放入 batch.

addCommitToBatch

1
2
3
创建 event,添加到 current batch
如果 current batch is full
triggerCurrentBatchFlush

triggerCurrentBatchFlush

创建 PersistBatchEvent 并发送事件

PersistenceProcessorHandler

处理上面 PersistenceProcessorImpl 发送过来的事件,进行持久化处理。

onEvent

实际上只处理 commit 事件,会创建 put 对象将事务信息持久化到 hbase 的 commitTable (OMID_COMMIT_TABLE).

HBaseCommitTable

构造方法: 根据 HBaseCommitTableConfig 配置初始化

参考