Apache Omid Client 组件实现原理
作用
通过 TransactionManager 开启/提交/回滚事务,提供事务内快照隔离级别的读写操作。
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| TransactionManager tm = HBaseTransactionManager.newInstance();
Transaction tx0 = tm.begin();
byte[] rowId = rowIdGenerator.getRowId(); Put initialPut = new Put(rowId); initialPut.addColumn(family, qualifier, initialData); txTable.put(tx0, initialPut);
Get tx2Get = new Get(rowId); tx2Get.addColumn(family, qualifier); Result tx2GetResult = txTable.get(tx2, tx2Get);
tm.commit(tx0);
|
流程
开启事务
1 2 3
| tso 申请开始时间戳 创建 hbase transaction 对象 初始化 start timestamp 和 read timestamp 和 write timestamp
|
提交事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 1.调用 tso 逻辑 1.1.写冲突检测 1.2.分配 commit timestamp 1.3.写入 OMID_COMMIT_TABLE start timestamp 为 key start timestamp + commit timstamp 为 value 设置事务状态 committed 和 commit timestamp 添加 shadow cell 到 MY_TX_TABLE 遍历 write set,每一个 cell 对应一个 shadow cell 创建 shadow cell timestamp 属性是 writeTimestamp,其实就是 startTimestamp value 是当前事务的 commitTimestamp 更新 shadow cell 到测试表 MY_TX_TABLE 根据当前事务的 start timestamp 删除 commit table 记录
|
回滚事务
1 2 3 4
| 1.tso 判断发生写冲突 abort 当前事务 2.回滚当前事务 writeSet 里所有行的提交 删除 write cells flush
|
事务内读操作
1 2 3 4 5 6 7 8 9 10 11 12 13
| 获取当前事务的 read timestamp(默认就是 start timestamp) 设置查询时间戳范围 0 ~ readTimestamp+1,只读取当前事务和事务启动之前的最新快照版本 添加 shadow cell 查询条件 过滤查询返回的快照结果 构建 commitCache,存储 shadow cell 的 timestamp -> commit timestamp 映射 遍历非 shadow cells 判断当前 cell 是否在当前事务内 判断当前 cell 是否在当前事务可以读取的快照内 根据当前 cell 的 startTimestamp 查找 shadow cell 的 commitCache 查找 commitTable 若查询不到 commitTimstamp,说明当前 cell 尚未提交 如果上面没找到,则查询之前的版本重复上面的流程进行查找
|
SnapshotIsolationExample 事务内读操作流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| tx0 已经提交; tx1 开启事务后修改数据,但是没有提交; tx2 开启事务后读取数据,按道理应该读取到 tx0 已经提交的版本数据;
tx0: startTimestamp = 1721780321605000000 readTimestamp = 1721780321605000000 writeTimestamp = 1721780321605000000 commitTimestamp = 1721780325508000000
tx1: startTimestamp = 1721780328808000000 readTimestamp = 1721780328808000000 writeTimestamp = 1721780328808000000 commitTimestamp = 0 // 尚未提交
tx2: startTimestamp = 1721780328825000000 readTimestamp = 1721780328825000000 writeTimestamp = 1721780328825000000 commitTimestamp = 0 // 尚未提交
tx2 Get: 查询条件 timeRange=0~1721780328825000000+1
Get result: // tx0 shadow cells 0 = {NoTagsKeyValue@5564} "EXAMPLE_ROW/MY_CF:\x00\xC2\x80MY_Q\xC2\x80/1721780321605000000/Put/vlen=8/seqid=0" // tx1 update 1 = {NoTagsKeyValue@5565} "EXAMPLE_ROW/MY_CF:MY_Q/1721780328808000000/Put/vlen=4/seqid=0"
commitCache: // tx0 的 shadow cell {Long@5602} 1721780321605000000 -> {Long@5603} 1721780325508000000
snapshotFilter: 根据当前 cell 的 startTimestamp 查找 commitTimestamp // (tx1的当前cell) EXAMPLE_ROW/MY_CF:MY_Q/1721780328808000000/Put/vlen=4/seqid=0 判断当前cell有没有提交 没有 snapshot commitCache commitTable 查询不到 说明当前 tx1 cell 没有提交
----------------- 查询2 oldestCell: // tx1 update EXAMPLE_ROW/MY_CF:MY_Q/1721780328808000000/Put/vlen=4/seqid=0
tx2 Get: 查询历史快照,查询条件: timeRange=0~1721780328808000000 // tx1 的快照版本 snapshotCount=2 // 查询两个历史版本
Get result: // tx0 shadow cell 0 = {NoTagsKeyValue@5617} "EXAMPLE_ROW/MY_CF:\x00\xC2\x80MY_Q\xC2\x80/1721780321605000000/Put/vlen=8/seqid=0" // other 1 = {NoTagsKeyValue@5618} "EXAMPLE_ROW/MY_CF:\x00\xC2\x80MY_Q\xC2\x80/1721731810073000000/Put/vlen=8/seqid=0" // tx1 cell 2 = {NoTagsKeyValue@5619} "EXAMPLE_ROW/MY_CF:MY_Q/1721780321605000000/Put/vlen=10/seqid=0" // other2 3 = {NoTagsKeyValue@5620} "EXAMPLE_ROW/MY_CF:MY_Q/1721731810501000000/Put/vlen=4/seqid=0"
commitCache: {Long@5870} 1721731810073000000 -> {Long@5871} 1721731810450000000 // tx0 shadow cells {Long@5872} 1721780321605000000 -> {Long@5873} 1721780325508000000
遍历 filter current cells: // tx0 0 = {NoTagsKeyValue@5883} "EXAMPLE_ROW/MY_CF:MY_Q/1721780321605000000/Put/vlen=10/seqid=0" // other 1 = {NoTagsKeyValue@5884} "EXAMPLE_ROW/MY_CF:MY_Q/1721731810501000000/Put/vlen=4/seqid=0"
snapshotFilter: 在 commitCache 里查询到了 1721780321605000000 对应有 commitTimestamp 说明 tx0 已经提交
keyValuesInSnapshot 返回 tx0 对应的 cell // tx0 EXAMPLE_ROW/MY_CF:MY_Q/1721780321605000000/Put/vlen=10/seqid=0
|
事务内写操作
1 2 3 4 5 6 7
| 获取当前事务的 write timestamp row 添加 write timestamp 直接更新当前行记录 比如 value=initialVal -> value=val1 transaction.addWriteSetElement(cellId) 添加到 write set addMutation rows 写入刷到 hbase
|
模块
1 2
| omid-client omid-hbase-client
|
类
Transaction
1 2 3
| Transaction (org.apache.omid.transaction) AbstractTransaction (org.apache.omid.transaction) HBaseTransaction (org.apache.omid.transaction)
|
TransactionManager
1 2 3
| TransactionManager (org.apache.omid.transaction) AbstractTransactionManager (org.apache.omid.transaction) HBaseTransactionManager (org.apache.omid.transaction)
|