Narayana enlistResource流程

XA 事务内每个物理连接执行第一条物理 sql 之前,需要执行 xa start 操作开启每个RM分支上的 xa 事务,并将 xaResource 注册到 TransactionManager 里。

整体流程

1
2
3
4
5
6
7
8
9
10
transaction.enlistResource(xaResource)
执行 XA start xid
遍历 resource 判断当前是 isNewRM
创建 xid
createRecord
theTransaction.add(abstractRecord)
record 添加到 BasicAction.RecordList 里
执行 xa start 语句,若失败重试
MysqlXAConnection#start
把 XAResource 缓存到 map

使用

1
2
3
4
5
6
7
// 获取 XADataSource
XADataSource xaDatasource = MySQLUtil.getXADatasource(dataSourceName);
// 通过 XADataSource 创建 XAConnection
XAConnection xaConnection = xaDatasource.getXAConnection();
// 通过 xaConnection 获取 xaResource
// 通过 enlistResource 将 xaResource 注册到全局事务里
transaction.enlistResource(xaConnection.getXAResource());

调用链路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
internal.jta.transaction.arjunacore.TransactionImple#enlistResource
招募资源
internal.jta.transaction.arjunacore.TransactionImple#getStatus
获取事务状态必须是 STATUS_ACTIVE
internal.jta.transaction.arjunacore.TransactionImple#isNewRM
判断是新的RM(resource缓存里没有),需要进行资源注册
internal.jta.transaction.arjunacore.TransactionImple#createXid
创建 xid,由 formatID + gtrid_length + bqual_length + data 组成
arjuna.StateManager#get_uid
jta.xa.XATxConverter#getXid
jta.xa.XidImple#getHash
internal.jta.transaction.arjunacore.TransactionImple#createRecord
创建 record
jta.xa.XidImple#toString
internal.jta.resources.arjunacore.XAResourceRecord#order
arjuna.coordinator.AbstractRecord#order
com.zc.transaction.xa.SingleXAResource#start
执行 xa start
jta.xa.XidImple#toString
jta.xa.XidImple#getGlobalTransactionId
jta.xa.XidImple#getBranchQualifier
jta.xa.XidImple#getFormatId
arjuna.coordinator.BasicAction#add
arjuna.coordinator.BasicAction#criticalStart
arjuna.coordinator.RecordList#insert
插入到 RecordList
arjuna.coordinator.RecordList#insert
arjuna.coordinator.BasicAction#criticalEnd
internal.jta.xa.TxInfo#setState

代码流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImp#enlistResource() 方法
// 1.创建 xid
Xid xid = createXid(branchRequired, theModifier, xaRes);
TxInfo existingRM = isNewRM(xaRes);
AbstractRecord abstractRecord = createRecord(xaRes, params, xid);
if(abstractRecord != null) {
// 2.执行 xa start 语句
xaRes.start(xid, xaStartNormal);
// 3.将 abstractRecord 添加到 theTransaction 里
if(_theTransaction.add(abstractRecord) == AddOutcome.AR_ADDED) {
// 4.缓存 xaResource(xaConnection) 到 resources map 里
_resources.put(xaRes, new TxInfo(xid));
return true;
} else {
abstractRecord.topLevelAbort();
}
}

xa start 会调用 RM 的实现,比如 com.mysql.jdbc.jdbc2.optional.MysqlXAConnection 或 com.mysql.cj.jdbc.MysqlXAConnection#start 里的 xa start 方法.