Apache Omid TSO 组件实现原理
作用
独立进程,处理全局事务之间的并发冲突。
流程
TSOChannelHandler#channelRead -> AbstractRequestProcessor -> PersistenceProcessorHandler
总体流程
1 2 3 4 5 6 7 8 9
| thread1 TSOChannelHandler#channelRead AbstractRequestProcessor#timestampRequest 接收 client 请求,创建 RequestEvent 并 publish thread2 AbstractRequestProcessor#onEvent 处理 RequestEvent 请求 AbstractRequestProcessor#handleRequest PersistenceProcessorImpl#addTimestampToBatch 创建 PersistEvent,当 batch 满了发送事件 thread3 PersistenceProcessorHandler#onEvent 持久化事件处理
|
类
TSOChannelHandler
继承自 Netty 的 ChannelInboundHandlerAdapter,用于处理 TSO 的入站请求
。
channelRead
委托 requestProcessor 创建 timestampRequest 和 commitRequest 请求事件。
AbstractRequestProcessor
处理 timestamp 和 commit 事件。
onEvent
处理 RequestEvent 事件,按照事件类型派发给 handleTimestamp 和 handleCommit 方法进行处理。
handleTimestamp
1.通过 timestampOracle 获取下一个时间戳;
2.PersistenceProcessorImpl#addBatch 事件添加到 batch,但是后续对 timestamp 请求不会额外处理。
handleCommit
主要通过 hasConflictsWithCommittedTransactions 判断 writeSet 和 CommitHashMap 里是否有事务写冲突,如果没有则可以提交事务,分配 commitTimestamp。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| private void handleCommit(RequestEvent event) throws Exception { long startTimestamp = event.getStartTimestamp(); Iterable<Long> writeSet = event.writeSet(); Collection<Long> tableIdSet = event.getTableIdSet(); boolean isCommitRetry = event.isCommitRetry();
boolean nonEmptyWriteSet = writeSet.iterator().hasNext(); if (startTimestamp > lowWatermark && !hasConflictsWithFences(startTimestamp, tableIdSet) && !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) { long commitTimestamp = timestampOracle.next(); Optional<Long> forwardNewWaterMark = Optional.absent(); if (nonEmptyWriteSet) { long newLowWatermark = lowWatermark;
for (long r : writeSet) { long removed = hashmap.putLatestWriteForCell(r, commitTimestamp); newLowWatermark = Math.max(removed, newLowWatermark); }
if (newLowWatermark != lowWatermark) { lowWatermark = newLowWatermark; forwardNewWaterMark = Optional.of(lowWatermark); } } forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark); } else { if (isCommitRetry) { forwardCommitRetry(startTimestamp, c, event.getMonCtx()); } else { forwardAbort(startTimestamp, c, event.getMonCtx()); } } }
|
CommitHashMap
通过 LongCache 缓存 cellId -> lastCommittedTimestamp 的映射。
getLatestWriteForCell 方法:
根据 cellId 获取 lastCommittedTimestamp。
putLatestWriteForCell 方法:
更新 cellId 对应的 lastCommittedTimestamp。
LongCache
缓存 cellId -> lastCommittedTimestamp 的映射。
get 和 set 操作都是先将原始 cellId 进行 hash 操作找到位置,所以可能存在冲突。
set
更新 cellId 对应的 lastCommittedTimestamp。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public long set(long key, long value) { final int index = index(key); int oldestIndex = 0; long oldestValue = Long.MAX_VALUE; for (int i = 0; i < associativity; ++i) { int currIndex = 2 * (index + i); if (cache[currIndex] == key) { oldestValue = 0; oldestIndex = currIndex; break; } if (cache[currIndex + 1] <= oldestValue) { oldestValue = cache[currIndex + 1]; oldestIndex = currIndex; } } cache[oldestIndex] = key; cache[oldestIndex + 1] = value; return oldestValue; }
|
get
获取 cellId 对应的 lastCommittedTimestamp,找不到则返回 0.
1 2 3 4 5 6 7 8 9 10
| public long get(long key) { final int index = index(key); for (int i = 0; i < associativity; ++i) { int currIndex = 2 * (index + i); if (cache[currIndex] == key) { return cache[currIndex + 1]; } } return 0; }
|
PersistenceProcessorImpl
将 startTimestamp 和 commitTimestamp 放入 batch.
addCommitToBatch
1 2 3
| 创建 event,添加到 current batch 如果 current batch is full triggerCurrentBatchFlush
|
triggerCurrentBatchFlush
创建 PersistBatchEvent 并发送事件
PersistenceProcessorHandler
处理上面 PersistenceProcessorImpl 发送过来的事件,进行持久化处理。
onEvent
实际上只处理 commit 事件,会创建 put 对象将事务信息持久化到 hbase 的 commitTable (OMID_COMMIT_TABLE).
HBaseCommitTable
构造方法: 根据 HBaseCommitTableConfig 配置初始化
参考