Omid Timestamp Oracle 组件实现原理
作用
生成全局单调递增的时间戳,支持获取操作和崩溃恢复。
功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| 1.生成全局单调递增的时间戳(支持崩溃恢复) api next 返回下一个时间戳 getLast 返回最后一个分配的时间戳(当前时间戳) 实现方式 TimestampOracleImpl 单调递增的时间戳 在分配时间戳时,如果当前的最大时间戳已经用完,它会触发一个后台任务来更新最大时间戳 next: 分配下一个时间戳。如果当前的最大时间戳已经用完,它会等待后台任务分配新的时间戳 WorldClockOracleImpl 基于世界时间(System.timestamp)生成全局单调递增的时间戳 2.存储时间戳 存储方式 in memory zk hbase api updateMaxTimestamp 更新最大时间戳 getMaxTimestamp 获取最大时间戳
|
类
TimestampStorage
存储时间戳
方法:
updateMaxTimestamp 更新最大时间戳
getMaxTimestamp 获取最大时间戳
实现类:
1 2 3 4 5
| TimestampStorage InMemoryTimestampStorage in TimestampOracleImpl (org.apache.omid.tso) 使用内存存储 ZKTimestampStorage (org.apache.omid.timestamp.storage) 使用 zk 存储 HBaseTimestampStorage (org.apache.omid.timestamp.storage) 使用 hbase 存储 InMemoryTimestampStorage in WorldClockOracleImpl (org.apache.omid.tso) 使用内存存储
|
ZKTimestampStorage
通过 zk curator DistributedAtomicLong 实现 DistributedAtomicLong timestamp
方法:
updateMaxTimestamp():
执行 timestamp.compareAndSet(previousMaxTimestamp, newMaxTimestamp)
getMaxTimestamp():
返回 timestamp.get()
TimestampOracle
TimestampOracle 提供了一个服务,用于生成单调递增的时间戳
。
方法:
initialize()
用于初始化时间戳服务。
next()
返回下一个时间戳。
getLast()
返回最后一个分配的时间戳(当前时间戳)
实现类
1 2 3 4
| TimestampOracle (org.apache.omid.tso) TimestampOracleImpl (org.apache.omid.tso) PausableTimestampOracle (org.apache.omid.tso) WorldClockOracleImpl (org.apache.omid.tso)
|
TimestampOracleImpl
initialize()
1.从存储中获取 maxTimestamp;
2.初始化 AllocateTimestampBatchTask 用于分配时间戳,并执行一次.
next()
生成下一个时间戳。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public long next() { lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN; if (lastTimestamp >= nextAllocationThreshold) { nextAllocationThreshold = Long.MAX_VALUE; executor.execute(allocateTimestampsBatchTask); } if (lastTimestamp >= maxTimestamp) { assert (maxTimestamp <= maxAllocatedTimestamp); while (maxAllocatedTimestamp == maxTimestamp) { } assert (maxAllocatedTimestamp > maxTimestamp); maxTimestamp = maxAllocatedTimestamp; nextAllocationThreshold = maxTimestamp - TIMESTAMP_REMAINING_THRESHOLD; assert (nextAllocationThreshold > lastTimestamp && nextAllocationThreshold < maxTimestamp); assert (lastTimestamp < maxTimestamp); } return lastTimestamp; }
|
AllocateTimestampBatchTask
定时任务批量更新存储系统中最大的时间戳,每次预分配 TIMESTAMP_BATCH 数量的时间戳。
1 2
| static final long TIMESTAMP_BATCH = 10_000_000 * CommitTable.MAX_CHECKPOINTS_PER_TXN; long newMaxTimestamp = previousMaxTimestamp + TIMESTAMP_BATCH;
|
WorldClockOracleImpl
基于世界时间的单调递增时间戳。
initialize
从 TimestampStorage 中获取最大的时间戳,并启动定时任务来定期更新最大时间戳。
next
用于获取下一个时间戳。
如果当前时间戳可用,则直接返回;
否则,等待定时任务分配新的时间戳。
getLast
方法返回最后一个时间戳。
AllocateTimestampBatchTask#run
定时任务预分配时间戳。
1 2 3
|
long newMaxTime = (System.currentTimeMillis() + TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;
|
next()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public long next() { long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS; lastTimestamp += CommitTable.MAX_CHECKPOINTS_PER_TXN; if (lastTimestamp >= currentMsFirstTimestamp) { return lastTimestamp; } if (currentMsFirstTimestamp >= maxTimestamp) { while (maxAllocatedTime <= currentMsFirstTimestamp) { try { Thread.sleep(1000); } catch (InterruptedException e) { continue; } } assert (maxAllocatedTime > maxTimestamp); maxTimestamp = maxAllocatedTime; } lastTimestamp = currentMsFirstTimestamp; return lastTimestamp; }
|