Narayana enlistResource流程
XA 事务内每个物理连接执行第一条物理 sql 之前,需要执行 xa start 操作开启每个RM分支上的 xa 事务,并将 xaResource 注册到 TransactionManager 里。
整体流程
1 2 3 4 5 6 7 8 9 10
| transaction.enlistResource(xaResource) 执行 XA start xid 遍历 resource 判断当前是 isNewRM 创建 xid createRecord theTransaction.add(abstractRecord) record 添加到 BasicAction.RecordList 里 执行 xa start 语句,若失败重试 MysqlXAConnection#start 把 XAResource 缓存到 map
|
使用
1 2 3 4 5 6 7
| XADataSource xaDatasource = MySQLUtil.getXADatasource(dataSourceName);
XAConnection xaConnection = xaDatasource.getXAConnection();
transaction.enlistResource(xaConnection.getXAResource());
|
调用链路
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| internal.jta.transaction.arjunacore.TransactionImple#enlistResource 招募资源 internal.jta.transaction.arjunacore.TransactionImple#getStatus 获取事务状态必须是 STATUS_ACTIVE internal.jta.transaction.arjunacore.TransactionImple#isNewRM 判断是新的RM(resource缓存里没有),需要进行资源注册 internal.jta.transaction.arjunacore.TransactionImple#createXid 创建 xid,由 formatID + gtrid_length + bqual_length + data 组成 arjuna.StateManager#get_uid jta.xa.XATxConverter#getXid jta.xa.XidImple#getHash internal.jta.transaction.arjunacore.TransactionImple#createRecord 创建 record jta.xa.XidImple#toString internal.jta.resources.arjunacore.XAResourceRecord#order arjuna.coordinator.AbstractRecord#order com.zc.transaction.xa.SingleXAResource#start 执行 xa start jta.xa.XidImple#toString jta.xa.XidImple#getGlobalTransactionId jta.xa.XidImple#getBranchQualifier jta.xa.XidImple#getFormatId arjuna.coordinator.BasicAction#add arjuna.coordinator.BasicAction#criticalStart arjuna.coordinator.RecordList#insert 插入到 RecordList arjuna.coordinator.RecordList#insert arjuna.coordinator.BasicAction#criticalEnd internal.jta.xa.TxInfo#setState
|
代码流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
Xid xid = createXid(branchRequired, theModifier, xaRes); TxInfo existingRM = isNewRM(xaRes); AbstractRecord abstractRecord = createRecord(xaRes, params, xid); if(abstractRecord != null) { xaRes.start(xid, xaStartNormal); if(_theTransaction.add(abstractRecord) == AddOutcome.AR_ADDED) { _resources.put(xaRes, new TxInfo(xid)); return true; } else { abstractRecord.topLevelAbort(); } }
|
xa start 会调用 RM 的实现,比如 com.mysql.jdbc.jdbc2.optional.MysqlXAConnection 或 com.mysql.cj.jdbc.MysqlXAConnection#start 里的 xa start 方法.