H2 Database 事务隔离性实现原理

使用

测试 cases:

1
2
3
4
5
6
7
8
9
10
11
12
13
隔离级别 RC
A 开启事务
B 开启事务
B 插入数据并提交
A 可以读取到
隔离级别 RR
A 开启事务
B 开启事务
A 查询
B 插入数据并提交
A 读取不到
A 自己插入数据
A 可以读取到自己插入的数据

功能

H2 支持的事务隔离级别,定义在 IsolationLevel 里。
包括 READ_UNCOMMITTED, READ_COMMITTED, REPEATABLE_READ, SNAPSHOT, SERIALIZABLE。
其中 REPEATABLE_READ 不允许出现 脏读 和 不可重复读,但是允许出现幻读。
SNAPSHOT 不允许出现 脏读 和 不可重复读 和 幻读。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Level of isolation.
*/
public enum IsolationLevel {

/**
* Dirty reads, non-repeatable reads and phantom reads are allowed.
*/
READ_UNCOMMITTED(Connection.TRANSACTION_READ_UNCOMMITTED, Constants.LOCK_MODE_OFF),

/**
* Dirty reads aren't allowed; non-repeatable reads and phantom reads are
* allowed.
*/
READ_COMMITTED(Connection.TRANSACTION_READ_COMMITTED, Constants.LOCK_MODE_READ_COMMITTED),

/**
* Dirty reads and non-repeatable reads aren't allowed; phantom reads are
* allowed.
*/
REPEATABLE_READ(Connection.TRANSACTION_REPEATABLE_READ, Constants.LOCK_MODE_TABLE),

/**
* Dirty reads, non-repeatable reads and phantom reads are'n allowed.
*/
SNAPSHOT(Constants.TRANSACTION_SNAPSHOT, Constants.LOCK_MODE_TABLE),

/**
* Dirty reads, non-repeatable reads and phantom reads are'n allowed.
* Concurrent and serial execution of transactions with this isolation level
* should have the same effect.
*/
SERIALIZABLE(Connection.TRANSACTION_SERIALIZABLE, Constants.LOCK_MODE_TABLE);
}

TMIterator

1
2
3
4
TMIterator
RepeatableIterator
CommittedIterator
UncommittedIterator

Snapshot

1
2
3
4
5
6
7
8
9
10
11
12
13
// 快照.包含 map 的根节点引用 和 正在提交的事务集合.
final class Snapshot<K,V> {

/**
* mvMap 根节点引用.
*/
final RootReference<K,V> root;

/**
* 提交中的事务.对应创建快照当时的 TransactionStore.committingTransactions
*/
final BitSet committingTransactions;
}

Transaction & TransactionMap

1
2
3
一个物理连接对应一个 Transaction.
一个 mvMap 对应一个 TransactionMap.
一个 Transaction 对应多个 TransactionMap,因为一个事务可能涉及到多张表.

总体流程

创建物理连接

1
2
3
4
5
第一次获取 Transaction 对象时创建 Transaction
原子性生成唯一的事务 id
根据 undo log id(事务内从0开始,此时为0) 和 事务隔离级别创建 Transaction 对象。
根据事务 id 创建 undo log mvMap。
Transaction 包含 mvMap id -> transaction map 的映射。此时为空,因为还未涉及到表操作。

获取连接里的 databaseName,依次创建 Database -> MVTable -> MVPrimaryIndex -> TransactionMap.
即一个连接的一个 MVPrimaryIndex 对应一个 TransactionMap.

TransactionMap 构成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public final class TransactionMap<K, V> extends AbstractMap<K,V> {

/**
* 用于写的 map
* The map used for writing (the latest version).
* <p>
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/
public final MVMap<K, VersionedValue<V>> map;

/**
* 当前 map 关联的事务
* The transaction which is used for this map.
*/
private final Transaction transaction;

/**
* 根据隔离级别可能是事务开启时的快照,或者事务执行 statement 时候的快照.
*
* Snapshot of this map as of beginning of transaction or
* first usage within transaction or
* beginning of the statement, depending on isolation level
*/
private Snapshot<K,VersionedValue<V>> snapshot;

/**
* 开始执行 statement 时候的快照.
*
* Snapshot of this map as of beginning of beginning of the statement
*/
private Snapshot<K,VersionedValue<V>> statementSnapshot;

/**
* map数据是否被当前事务修改过
* Indicates whether underlying map was modified from within related transaction
*/
private boolean hasChanges;

private final TxDecisionMaker<K,V> txDecisionMaker;
private final TxDecisionMaker<K,V> ifAbsentDecisionMaker;
private final TxDecisionMaker<K,V> lockDecisionMaker;
}

一个 session 对应一个 Transaction,连接内第一次获取 Transaction 对象时,会创建 transaction 实例,并缓存到 SessionLocal 里.

1
transaction = store.getTransactionStore().begin(this, this.lockTimeout, id, isolationLevel); // 开启事务.传入 锁超时时间, session id, 隔离级别

此时会原子性生成唯一的事务id.
根据 undo log id(事务内从0开始,此时为0) 和 事务隔离级别创建 Transaction 对象.
根据事务id创建 undo log mvMap.

Transaction 结构如下:
一个 Transaction 是会包含多个 TransactionMap,比如对应该事务内涉及的多张表,每张表的 mvMap 都会有对应的 TransactionMap.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final class Transaction {

public static final int STATUS_CLOSED = 0;
public static final int STATUS_OPEN = 1;
public static final int STATUS_PREPARED = 2;
public static final int STATUS_COMMITTED = 3;
private static final int STATUS_ROLLING_BACK = 4;
private static final int STATUS_ROLLED_BACK = 5;

// 在事务中存储的“operation id”中有多少位属于日志id(其余的属于事务id)。
static final int LOG_ID_BITS = 40;
private static final int STATUS_BITS = 4;

final TransactionStore store;
final TransactionStore.RollbackListener listener;

// 事务id
final int transactionId;

/* 事务状态是一个原子复合字段:1.创建事务时初始化;2.事务内操作会递增logId;2.事务提交/回滚时修改事务状态 status
* Transaction state is an atomic composite field:
* bit 45 : flag whether transaction had rollback(s) 位 45:标记事务是否有回滚
* bits 44-41 : status 位 44-41:状态位
* bits 40 : overflow control bit, 1 indicates overflow 40:溢出控制位,1 表示溢出
* bits 39-0 : log id of the last entry in the undo log map 位 39-0:undo log 日志 map 中最后一个条目的日志 ID
*/
private final AtomicLong statusAndLogId;

private MVStore.TxCounter txCounter;

int timeoutMillis;

// 创建当前事务的 session id
private final int ownerId;

/**
* mvMap id -> transaction map.
*/
private final Map<Integer, TransactionMap<?,?>> transactionMaps = new HashMap<>();

final IsolationLevel isolationLevel;
}

开启事务

设置 autoCommit 标记开启事务。

插入/修改数据

事务内插入的数据是未提交状态,在事务提交操作时变更成已提交状态。

TransactionMap 重写了 Map 的 putIfAbsent 方法,进行事务性插入操作到 btree。
插入数据通过 TxDecisionMaker.PutIfAbsentDecisionMaker#decide 进行事务插入操作决定,会根据 当前值、目标值 和 游标位置 决定操作类型。
然后会调用 TransactionMap#getFromSnapshot 根据事务隔离级别读取对应的值。

TransactionMap#getFromSnapshot
1
2
3
4
5
6
7
8
9
10
11
对于 REPEATABLE_READ
如果当前事务有数据变更
获取 statement 级别快照,获取快照根节点,判断操作数据的事务id是否是当前事务,如果是返回当前值(读自己可见).
如果当前事务没有数据变更
获取事务级别快照 getSnapshot
根据快照的根节点 和 创建快照时候的 committingTransactions 决定可见性
获取当前数据对应的事务id
操作当前数据的事务不是当前事务 & 事务不在提交中
返回已提交的值 CommittedValue
操作当前数据的事务是当前事务 或者 正在提交中
返回当前值 CurrentValue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Get the value for the given key, or null if value does not exist in accordance with transactional rules.
* Value is taken from a snapshot, appropriate for an isolation level of the related transaction
*
* @param key the key
* @return the value, or null if not found
*/
public V getFromSnapshot(K key) {
switch (transaction.isolationLevel) {
case READ_UNCOMMITTED: { // 直接从语句快照中获取值
Snapshot<K,VersionedValue<V>> snapshot = getStatementSnapshot();
VersionedValue<V> data = map.get(snapshot.root.root, key);
if (data != null) {
return data.getCurrentValue();
}
return null;
}
case REPEATABLE_READ:
case SNAPSHOT:
case SERIALIZABLE:
if (transaction.hasChanges()) { // RR 检查当前事务是否有更改.如果有更改,从语句快照中获取值
Snapshot<K,VersionedValue<V>> snapshot = getStatementSnapshot();
VersionedValue<V> data = map.get(snapshot.root.root, key);
if (data != null) {
long id = data.getOperationId();
if (id != 0L && transaction.transactionId == TransactionStore.getTransactionId(id)) {
return data.getCurrentValue();
}
}
}
//$FALL-THROUGH$
case READ_COMMITTED:
default:
Snapshot<K,VersionedValue<V>> snapshot = getSnapshot(); // 获取 snapshot
return getFromSnapshot(snapshot.root, snapshot.committingTransactions, key); // 获取 snapshot 中的数据
}
}

private V getFromSnapshot(RootReference<K, VersionedValue<V>> rootRef, BitSet committingTransactions, K key) {
VersionedValue<V> data = map.get(rootRef.root, key);
if (data == null) {
// doesn't exist
return null;
}
long id = data.getOperationId(); // 获取操作 id
if (id != 0) { // 如果操作 id 不为 0
int tx = TransactionStore.getTransactionId(id); // 获取事务 id
if (tx != transaction.transactionId && !committingTransactions.get(tx)) { // 如果事务 id 不等于当前事务 id 并且 committingTransactions 不包含这个事务(不是当前事务&事务未提交)
// added/modified/removed by uncommitted transaction, change should not be visible
return data.getCommittedValue(); // 返回已提交的值
}
}
// added/modified/removed by this transaction or another transaction which is committed by now
return data.getCurrentValue(); // (当前事务 或 已提交事务),返回当前值
}

提交事务

CommitDecisionMaker

org.h2.mvstore.tx.CommitDecisionMaker#selectValue
获取 VersionedValueUncommitted 里的 DefaultRow 返回(VersionedValue)。
执行 btree 更新操作,p.setValue(index, value); 将 VersionedValueUncommitted 替换成了 DefaultRow。

回滚事务

查询数据

查询数据时会获取当前 SQL 里涉及到的所有表,清理当前事务对象里缓存的这些表对应的缓存(清理 transactionMaps)。
然后遍历这些表,根据 mvMap 的根节点和 committingTransactions 创建语句级别快照 statement snapshot。
通过 snapshot 和不同的隔离级别创建数据迭代器。最终决定数据的可见性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1.创建快照
startStatementWithinTransaction
获取依赖的表 dependencies mvTables
对于 REPEATABLE_READ 级别
添加表对应的 mvMap 到 `HashSet<MVMap<Object,VersionedValue<Object>>> maps` 里(MVPrimaryIndex->transactionMap->versionedValueMap)
markStatementStart
markStatementEnd
遍历 transactionMaps(每张表对应一个 transactionMap),清理每个 transactionMap 对应的 snapshot(后面重新创建)
通过每个 MVMap<Object,VersionedValue<Object>> map 获取 or 创建对应的 transactionMap
创建 decision maker
根据不同隔离级别,允许不可重复读 ? lock : repeatable read lock
为每个 mvMap 创建 snapshot 快照,设置到对应的 transactionMap 里,传入 committingTransactions
2.迭代器迭代数据
对于 RR 和以上隔离级别,当前事务没有变更数据,创建 CommittedIterator。
否则根据隔离级别创建对应的迭代器。
通过 transactionMap.snapshot 快照的根节点创建 cursor
根据快照迭代数据
3.执行完毕查询清理快照
session.endStatement()
transaction.markStatementEnd()
遍历 transactionMaps,清空 statementSnapshot

1.创建快照

获取查询涉及的所有表,根据表对应的 mvMap 创建对应的 transactionMap。
根据 transactionMap 的当前 root 节点 和 committingTransactions,创建对应的语句级别快照 statement snapshot。
事务内第一次创建 statement snapshot, 会将 statement snapshot 设置给事务级别快照 snapshot。
根据不同隔离级别创建对应的迭代器 TMIterator 的具体实现。
对于 REPEATABLE_READ 及其以上隔离级别的事务
如果当前事务有数据修改,创建 RepeatableIterator,否则创建 CommittedIterator
READ_COMMITTED 隔离级别事务创建 CommittedIterator
READ_UNCOMMITTED 隔离级别事务创建 UncommittedIterator

2.迭代器迭代数据

1
2
3
4
5
6
7
8
9
// 创建迭代器
TMIterator(TransactionMap<K, V> transactionMap, K from, K to, Snapshot<K, VersionedValue<V>> snapshot,
boolean reverse, boolean forEntries) { // 创建迭代器
Transaction transaction = transactionMap.getTransaction();
this.transactionId = transaction.transactionId;
this.forEntries = forEntries;
this.cursor = transactionMap.map.cursor(snapshot.root, from, to, reverse); // 通过快照的根节点创建 cursor
this.committingTransactions = snapshot.committingTransactions;
}
不同隔离级别迭代器实现类
1
2
3
4
5
6
7
TMIterator
RepeatableIterator
适用于 REPEATABLE_READ 及其以上隔离级别的事务
CommittedIterator
适用于 READ_COMMITTED 隔离级别的事务
UncommittedIterator
适用于 READ_UNCOMMITTED 隔离级别事务
1.RepeatableIterator

适用于 REPEATABLE_READ 及其以上隔离级别的事务的迭代器。

1
2
3
4
5
6
7
8
9
10
11
12
13
构造方法
通过事务级别快照创建 cursor
通过语句级别快照创建 uncommittedCursor,用于迭代当前事务没有提交的事务
fetchNext 迭代数据
先从事务级别 snapshot 的 cursor 迭代 next 数据
如果有数据判断可见性
若事务ID等于当前事务ID 或 事务正在提交,获取当前值 current value
否则使用 committed value
再从语句级别快照(statement snapshot) 的 uncommittedCursor 里迭代数据
如果有数据判断可见性
若操作数据的事务ID等于当前事务ID,获取当前值 current value
否则使用 committed value
然后对比决定返回 snapshot 还是 uncommitted 值
2.CommittedIterator

适用于 READ_COMMITTED 隔离级别的事务的迭代器。
高于 READ_COMMITTED 隔离级别的事务如果自身没有修改过数据,也可以使用。

1
2
3
4
5
6
7
8
9
构造方法
使用事务级别快照创建 cursor
fetchNext 迭代数据
从 cursor 里获取下一行数据
获取数据 operation id 里的 transaction id
如果操作当前行数据是 其他事务尚未提交的操作
判断条件: transaction id 不是当前事务 并且 不在提交列表里
则获取已提交的值(committed value)
否则获取当前值(current value)
3.UncommittedIterator

适用于 READ_UNCOMMITTED 隔离级别事务的迭代器。

1
2
3
4
5
构造方法
每次创建新的快照 snapshot,创建迭代器
fetchNext 迭代数据
从 cursor 里获取下一行数据
每次都获取当前值 current value

3.执行完毕查询清理快照