Omid Timestamp Oracle 组件实现原理

作用

生成全局单调递增的时间戳,支持获取操作和崩溃恢复。

功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1.生成全局单调递增的时间戳(支持崩溃恢复)
api
next
返回下一个时间戳
getLast
返回最后一个分配的时间戳(当前时间戳)
实现方式
TimestampOracleImpl
单调递增的时间戳
在分配时间戳时,如果当前的最大时间戳已经用完,它会触发一个后台任务来更新最大时间戳
next:
分配下一个时间戳。如果当前的最大时间戳已经用完,它会等待后台任务分配新的时间戳
WorldClockOracleImpl
基于世界时间(System.timestamp)生成全局单调递增的时间戳
2.存储时间戳
存储方式
in memory
zk
hbase
api
updateMaxTimestamp
更新最大时间戳
getMaxTimestamp
获取最大时间戳

TimestampStorage

存储时间戳

方法:
updateMaxTimestamp 更新最大时间戳
getMaxTimestamp 获取最大时间戳

实现类:

1
2
3
4
5
TimestampStorage
InMemoryTimestampStorage in TimestampOracleImpl (org.apache.omid.tso) 使用内存存储
ZKTimestampStorage (org.apache.omid.timestamp.storage) 使用 zk 存储
HBaseTimestampStorage (org.apache.omid.timestamp.storage) 使用 hbase 存储
InMemoryTimestampStorage in WorldClockOracleImpl (org.apache.omid.tso) 使用内存存储

ZKTimestampStorage

通过 zk curator DistributedAtomicLong 实现 DistributedAtomicLong timestamp

方法:
updateMaxTimestamp():
执行 timestamp.compareAndSet(previousMaxTimestamp, newMaxTimestamp)
getMaxTimestamp():
返回 timestamp.get()

TimestampOracle

TimestampOracle 提供了一个服务,用于生成单调递增的时间戳

方法:
initialize()
用于初始化时间戳服务。

next()
返回下一个时间戳。

getLast()
返回最后一个分配的时间戳(当前时间戳)

实现类

1
2
3
4
TimestampOracle (org.apache.omid.tso)
TimestampOracleImpl (org.apache.omid.tso)
PausableTimestampOracle (org.apache.omid.tso)
WorldClockOracleImpl (org.apache.omid.tso)

TimestampOracleImpl

initialize()

1.从存储中获取 maxTimestamp;
2.初始化 AllocateTimestampBatchTask 用于分配时间戳,并执行一次.

next()

生成下一个时间戳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public long next() {
lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
// 当前时间戳超过分配阈值时,触发执行时间戳批次分配任务
if (lastTimestamp >= nextAllocationThreshold) {
// 设置下一次分配阈值为Long.MAX_VALUE,确保只有一个线程会执行分配任务
nextAllocationThreshold = Long.MAX_VALUE;
executor.execute(allocateTimestampsBatchTask);
}
// 当lastTimestamp超过当前已分配的最大时间戳时,等待新一批时间戳被分配
if (lastTimestamp >= maxTimestamp) {
assert (maxTimestamp <= maxAllocatedTimestamp);
// 自旋等待,直到已分配的最大时间戳被更新
while (maxAllocatedTimestamp == maxTimestamp) {
// 自旋
}
assert (maxAllocatedTimestamp > maxTimestamp);
// 更新当前最大时间戳并计算下次分配的阈值
maxTimestamp = maxAllocatedTimestamp;
nextAllocationThreshold = maxTimestamp - TIMESTAMP_REMAINING_THRESHOLD;
assert (nextAllocationThreshold > lastTimestamp && nextAllocationThreshold < maxTimestamp);
assert (lastTimestamp < maxTimestamp);
}
// 返回新生成的时间戳
return lastTimestamp;
}
AllocateTimestampBatchTask

定时任务批量更新存储系统中最大的时间戳,每次预分配 TIMESTAMP_BATCH 数量的时间戳。

1
2
static final long TIMESTAMP_BATCH = 10_000_000 * CommitTable.MAX_CHECKPOINTS_PER_TXN; // 10 million
long newMaxTimestamp = previousMaxTimestamp + TIMESTAMP_BATCH;

WorldClockOracleImpl

基于世界时间的单调递增时间戳。

initialize
从 TimestampStorage 中获取最大的时间戳,并启动定时任务来定期更新最大时间戳。

next
用于获取下一个时间戳。
如果当前时间戳可用,则直接返回;
否则,等待定时任务分配新的时间戳。

getLast
方法返回最后一个时间戳。

AllocateTimestampBatchTask#run

定时任务预分配时间戳。

1
2
3
// 预测一个未来的时间窗口内允许的最大事务时间戳.
// 当前时间(System.currentTimeMillis())加上一个预设的时间间隔TIMESTAMP_INTERVAL_MS,然后乘以每毫秒允许的最大事务数MAX_TX_PER_MS
long newMaxTime = (System.currentTimeMillis() + TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;
next()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public long next() {
long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS; // 通过当前时间乘以每毫秒的最大事务数来计算当前毫秒的第一个时间戳
lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN;
if (lastTimestamp >= currentMsFirstTimestamp) { // 如果lastTimestamp大于等于当前毫秒的第一个时间戳,直接返回lastTimestamp
return lastTimestamp;
}
// 当前 timestamp 的第一个时间戳 >= 最大时间戳.这里sleep等待 allocate 线程进行分配
if (currentMsFirstTimestamp >= maxTimestamp) { // Intentional race to reduce synchronization overhead in every access to maxTimestamp
while (maxAllocatedTime <= currentMsFirstTimestamp) { // Waiting for the interval allocation
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
continue;
}
}
assert (maxAllocatedTime > maxTimestamp);
maxTimestamp = maxAllocatedTime;
}
lastTimestamp = currentMsFirstTimestamp;
return lastTimestamp;
}