Apache Omid Client 组件实现原理

作用

通过 TransactionManager 开启/提交/回滚事务,提供事务内快照隔离级别的读写操作。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 1.创建 transaction manager
TransactionManager tm = HBaseTransactionManager.newInstance();
// 2.通过 transactin manger 开启事务
Transaction tx0 = tm.begin();
// 3.事务内插入数据
byte[] rowId = rowIdGenerator.getRowId();
Put initialPut = new Put(rowId);
initialPut.addColumn(family, qualifier, initialData);
txTable.put(tx0, initialPut);
// 4.事务内读取数据
Get tx2Get = new Get(rowId);
tx2Get.addColumn(family, qualifier);
Result tx2GetResult = txTable.get(tx2, tx2Get);
// 5.提交事务
tm.commit(tx0);

流程

开启事务

1
2
3
tso 申请开始时间戳
创建 hbase transaction 对象
初始化 start timestamp 和 read timestamp 和 write timestamp

提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1.调用 tso 逻辑
1.1.写冲突检测
1.2.分配 commit timestamp
1.3.写入 OMID_COMMIT_TABLE
start timestamp 为 key
start timestamp + commit timstamp 为 value
设置事务状态 committed 和 commit timestamp
添加 shadow cell 到 MY_TX_TABLE
遍历 write set,每一个 cell 对应一个 shadow cell
创建 shadow cell
timestamp 属性是 writeTimestamp,其实就是 startTimestamp
value 是当前事务的 commitTimestamp
更新 shadow cell 到测试表 MY_TX_TABLE
根据当前事务的 start timestamp 删除 commit table 记录

回滚事务

1
2
3
4
1.tso 判断发生写冲突 abort 当前事务
2.回滚当前事务 writeSet 里所有行的提交
删除 write cells
flush

事务内读操作

1
2
3
4
5
6
7
8
9
10
11
12
13
获取当前事务的 read timestamp(默认就是 start timestamp)
设置查询时间戳范围 0 ~ readTimestamp+1,只读取当前事务和事务启动之前的最新快照版本
添加 shadow cell 查询条件
过滤查询返回的快照结果
构建 commitCache,存储 shadow cell 的 timestamp -> commit timestamp 映射
遍历非 shadow cells
判断当前 cell 是否在当前事务内
判断当前 cell 是否在当前事务可以读取的快照内
根据当前 cell 的 startTimestamp
查找 shadow cell 的 commitCache
查找 commitTable
若查询不到 commitTimstamp,说明当前 cell 尚未提交
如果上面没找到,则查询之前的版本重复上面的流程进行查找

SnapshotIsolationExample 事务内读操作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
tx0 已经提交;
tx1 开启事务后修改数据,但是没有提交;
tx2 开启事务后读取数据,按道理应该读取到 tx0 已经提交的版本数据;

tx0:
startTimestamp = 1721780321605000000
readTimestamp = 1721780321605000000
writeTimestamp = 1721780321605000000
commitTimestamp = 1721780325508000000

tx1:
startTimestamp = 1721780328808000000
readTimestamp = 1721780328808000000
writeTimestamp = 1721780328808000000
commitTimestamp = 0 // 尚未提交

tx2:
startTimestamp = 1721780328825000000
readTimestamp = 1721780328825000000
writeTimestamp = 1721780328825000000
commitTimestamp = 0 // 尚未提交

tx2 Get:
查询条件
timeRange=0~1721780328825000000+1

Get result:
// tx0 shadow cells
0 = {NoTagsKeyValue@5564} "EXAMPLE_ROW/MY_CF:\x00\xC2\x80MY_Q\xC2\x80/1721780321605000000/Put/vlen=8/seqid=0"
// tx1 update
1 = {NoTagsKeyValue@5565} "EXAMPLE_ROW/MY_CF:MY_Q/1721780328808000000/Put/vlen=4/seqid=0"

commitCache:
// tx0 的 shadow cell
{Long@5602} 1721780321605000000 -> {Long@5603} 1721780325508000000

snapshotFilter:
根据当前 cell 的 startTimestamp 查找 commitTimestamp
// (tx1的当前cell) EXAMPLE_ROW/MY_CF:MY_Q/1721780328808000000/Put/vlen=4/seqid=0
判断当前cell有没有提交
没有 snapshot commitCache
commitTable 查询不到
说明当前 tx1 cell 没有提交

----------------- 查询2
oldestCell:
// tx1 update
EXAMPLE_ROW/MY_CF:MY_Q/1721780328808000000/Put/vlen=4/seqid=0

tx2 Get:
查询历史快照,查询条件:
timeRange=0~1721780328808000000 // tx1 的快照版本
snapshotCount=2 // 查询两个历史版本

Get result:
// tx0 shadow cell
0 = {NoTagsKeyValue@5617} "EXAMPLE_ROW/MY_CF:\x00\xC2\x80MY_Q\xC2\x80/1721780321605000000/Put/vlen=8/seqid=0"
// other
1 = {NoTagsKeyValue@5618} "EXAMPLE_ROW/MY_CF:\x00\xC2\x80MY_Q\xC2\x80/1721731810073000000/Put/vlen=8/seqid=0"
// tx1 cell
2 = {NoTagsKeyValue@5619} "EXAMPLE_ROW/MY_CF:MY_Q/1721780321605000000/Put/vlen=10/seqid=0"
// other2
3 = {NoTagsKeyValue@5620} "EXAMPLE_ROW/MY_CF:MY_Q/1721731810501000000/Put/vlen=4/seqid=0"

commitCache:
{Long@5870} 1721731810073000000 -> {Long@5871} 1721731810450000000
// tx0 shadow cells
{Long@5872} 1721780321605000000 -> {Long@5873} 1721780325508000000

遍历 filter current cells:
// tx0
0 = {NoTagsKeyValue@5883} "EXAMPLE_ROW/MY_CF:MY_Q/1721780321605000000/Put/vlen=10/seqid=0"
// other
1 = {NoTagsKeyValue@5884} "EXAMPLE_ROW/MY_CF:MY_Q/1721731810501000000/Put/vlen=4/seqid=0"

snapshotFilter:
在 commitCache 里查询到了 1721780321605000000 对应有 commitTimestamp
说明 tx0 已经提交

keyValuesInSnapshot
返回 tx0 对应的 cell
// tx0
EXAMPLE_ROW/MY_CF:MY_Q/1721780321605000000/Put/vlen=10/seqid=0

事务内写操作

1
2
3
4
5
6
7
获取当前事务的 write timestamp
row 添加 write timestamp
直接更新当前行记录
比如 value=initialVal -> value=val1
transaction.addWriteSetElement(cellId) 添加到 write set
addMutation
rows 写入刷到 hbase

模块

1
2
omid-client
omid-hbase-client

Transaction

1
2
3
Transaction (org.apache.omid.transaction)
AbstractTransaction (org.apache.omid.transaction)
HBaseTransaction (org.apache.omid.transaction)

TransactionManager

1
2
3
TransactionManager (org.apache.omid.transaction)
AbstractTransactionManager (org.apache.omid.transaction)
HBaseTransactionManager (org.apache.omid.transaction)