MySQL 驱动 Add Batch 优化实现

MySQL 驱动会在 JDBC URL 添加 rewriteBatchedStatements 参数时,对 batch 操作进行优化。本文测试各种参数组合的行为,并结合驱动代码简单分析。

batch参数组合行为

useServerPrepStmts 参数

1
2
3
PreparedStatement psmt = connection.prepareStatement("DELETE FROM t_order WHERE  `order_id` = ?");
psmt.setObject(1, 1);
psmt.execute();

开启:
使用服务端预编译,先发送 prepared 语句,再发送 excute 语句

不开启:
mysql 驱动会将占位符填充后,明文下发sql.

比如 DELETE FROM t_order WHERE order_id = ?; 语句

MySQL 驱动会下发 DELETE FROM t_order WHERE order_id = 1;

allowMultiQueries 参数

1
2
Statement statement = connection.createStatement();
statement.execute("DELETE FROM t_order WHERE `order_id` = 1;DELETE FROM t_order WHERE `order_id` = 2;");

不开启:
服务端不支持 DELETE FROM t_order WHERE order_id = 1;DELETE FROM t_order WHERE order_id = 2; 这样的批量delete语句

开启:
支持多语句,比如: DELETE FROM t_order WHERE order_id = 1;DELETE FROM t_order WHERE order_id = 2;

rewriteBatchedStatements 参数

1
2
3
4
5
6
PreparedStatement psmt = connection.prepareStatement("DELETE FROM t_order WHERE  `order_id` = ?");
for (int i = 1; i <= 500; i++) {
psmt.setObject(1, i);
psmt.addBatch();
}
psmt.executeBatch();

不开启:
batch 操作,在 addBatch 时一条条下发参数值。

开启:
在执行 executeBatch 时,将 batch 操作改写后批量下发;改写后的 SQL 比如 DELETE FROM t_order WHERE order_id = 1;DELETE FROM t_order WHERE order_id = 2;…;DELETE FROM t_order WHERE order_id = 500;

useServerPrepStmts+allowMultiQueries 参数

1
2
PreparedStatement psmt = connection.prepareStatement("DELETE FROM t_order WHERE  `order_id` = ?;DELETE FROM t_order WHERE  `order_id` = ?;");
psmt.execute();

虽然开启了服务端预编译参数 useServerPrepStmts,但是 MySQL JDBC 驱动会判断预编译 SQL 不支持 allowMultiQueries,会直接转换成客户端预编译,也就时会将占位符赋值后下发到 mysql。

比如对于以下 SQL: DELETE FROM t_order WHERE order_id = ?;DELETE FROM t_order WHERE order_id = ?;

客户端会将占位符填充后发送: DELETE FROM t_order WHERE order_id = 1;DELETE FROM t_order WHERE order_id = 2;

useServerPrepStmts+rewriteBatchedStatements 参数

1
2
3
4
5
6
PreparedStatement psmt = connection.prepareStatement("DELETE FROM t_order WHERE  `order_id` = ?");
for (int i = 1; i < 500; i++) {
psmt.setObject(1, i);
psmt.addBatch();
}
psmt.executeBatch();

对于 DELETE FROM t_order WHERE order_id = ? 的 batch 语句,

会转换成 DELETE FROM t_order WHERE order_id = ?;DELETE FROM t_order WHERE order_id = ?;多语句下发,但是服务端返回不支持。

然后客户端再使用客户端预编译尝试发送(会先发送 set multi option on 包,执行完再关闭该标识),然后下发多语句 DELETE FROM t_order WHERE order_id = 1;DELETE FROM t_order WHERE order_id = 2;…;DELETE FROM t_order WHERE order_id = 500;

useServerPrepStmts+allowMultiQueries+rewriteBatchedStatements 参数

1
2
3
4
5
6
PreparedStatement psmt = connection.prepareStatement("DELETE FROM t_order WHERE  `order_id` = ?");
for (int i = 1; i < 500; i++) {
psmt.setObject(1, i);
psmt.addBatch();
}
psmt.executeBatch();

当执行 batch delete 语句时:

虽然开启了 useServerPrepStmts 预编译参数,但是 MySQL JDBC 驱动会判断预编译 SQL 不支持 allowMultiQueries,会直接转换成客户端预编译,也就将占位符赋值后下发到服务端。

也就是发送 DELETE FROM t_order WHERE order_id = 1;DELETE FROM t_order WHERE order_id = 2;…;DELETE FROM t_order WHERE order_id = 500;

MySQL 驱动代码分析

下面结合驱动代码简单分析两组参数的流程:

1.useServerPrepStmts+rewriteBatchedStatements+allowMultiQueries 参数

1
2
3
4
5
6
7
8
// 测试 demo
// jdbc:mysql://localhost:3306/test?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&useServerPrepStmts=true&cachePrepStmts=true&prepStmtCacheSize=512&prepStmtCacheSqlLimit=8000&rewriteBatchedStatements=true&allowMultiQueries=true
PreparedStatement psmt = connection.prepareStatement("DELETE FROM t_order WHERE `order_id` = ?");
for (int i = 1; i < 500; i++) {
psmt.setObject(1, i);
psmt.addBatch();
}
psmt.executeBatch();

MySQL驱动处理逻辑:

1.prepareStatement

当执行 prepareStatement() 时,因为开启了 useServerPrepStmts 参数,所以会下发预编译 sql 给 server 端。

1
PreparedStatement psmt = connection.prepareStatement(sql);

相关调用栈如下:

1
2
3
4
5
6
7
8
9
10
11
12
com.mysql.cj.jdbc.ConnectionImpl#prepareStatement
com.mysql.cj.jdbc.ConnectionImpl#canHandleAsServerPreparedStatement
1.判断当前sql是否支持服务端预编译
com.mysql.cj.jdbc.ServerPreparedStatement#getInstance
com.mysql.cj.jdbc.ServerPreparedStatement#serverPrepare
2.下发预编译sql
com.mysql.cj.ServerPreparedQuery#serverPrepare
com.mysql.cj.protocol.a.NativeMessageBuilder#buildComStmtPrepare
com.mysql.cj.NativeSession#sendCommand
发包
com.mysql.cj.protocol.a.NativeProtocol#read
com.mysql.cj.protocol.a.ColumnDefinitionReader#unpackField

1.开启 useServerPrepStmts 参数(emulateUnsupportedPstmts 参数默认就为true),会再根据 canHandleAsServerPreparedStatement() 判断当前 sql 是否支持服务端预编译。

2.canHandleAsServerPreparedStatement()方法会通过 StringUtils._canHandleAsServerPreparedStatementNoCache() 方法检测 sql 是否可以支持服务端预编译。

  1. com.mysql.cj.jdbc.ServerPreparedStatement#getInstance 发送预编译 sql

2.addBatch

添加参数即可,不和服务端交互。

3.executeBatch

调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
com.mysql.cj.jdbc.StatementImpl#executeBatch
com.mysql.cj.jdbc.ClientPreparedStatement#executeBatchInternal
com.mysql.cj.jdbc.StatementImpl#statementBegins
com.mysql.cj.ServerPreparedQuery#statementBegins
com.mysql.cj.AbstractQuery#statementBegins
com.mysql.cj.AbstractPreparedQuery#getParseInfo
com.mysql.cj.ParseInfo#canRewriteAsMultiValueInsertAtSqlLevel 是否支持insert values改写优化
com.mysql.cj.jdbc.ClientPreparedStatement#executePreparedBatchAsMultiStatement 多语句执行优化
com.mysql.cj.AbstractQuery#getBatchedArgs
com.mysql.cj.AbstractPreparedQuery#computeBatchSize 计算批次大小
com.mysql.cj.ServerPreparedQuery#computeMaxParameterSetSizeAndBatchSize
com.mysql.cj.jdbc.ClientPreparedStatement#generateMultiStatementForBatch
com.mysql.cj.jdbc.ConnectionImpl#prepareStatement
com.mysql.cj.jdbc.ConnectionImpl#prepareStatement
com.mysql.cj.jdbc.ServerPreparedStatement#setOneBatchedParameterSet
com.mysql.cj.jdbc.ClientPreparedStatement#execute
com.mysql.cj.jdbc.ClientPreparedStatement#checkReadOnlySafeStatement
com.mysql.cj.protocol.a.NativeMessageBuilder#buildComQuery
com.mysql.cj.NativeSession#sendCommand
  1. 由于开启了 rewriteBatchedStatements 参数,并且 delelte batch size 大于 3,会通过 executePreparedBatchAsMultiStatement 优化成 批量 delete.
1
2
3
4
5
6
7
8
// 开启 rewriteBatchedStatements 参数
if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
// batch size > 3,则将 sql 转成 multi delete
if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
&& this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) {
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}
  1. executePreparedBatchAsMultiStatement 方法

计算需要分几个批次下发。正常500条sql一个批次即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 根据上面得到的batch sql的长度,确定分几个批次下发 multi sql;
/**
* Computes the optimum number of batched parameter lists to send
* without overflowing max_allowed_packet.
*
* @param numBatchedArgs
* original batch size
* @return computed batch size
*/
public int computeBatchSize(int numBatchedArgs) {
long[] combinedValues = computeMaxParameterSetSizeAndBatchSize(numBatchedArgs);

long maxSizeOfParameterSet = combinedValues[0];
long sizeOfEntireBatch = combinedValues[1];

// 整个 batch sql 的长度不能超长,正常走到这个分支里
if (sizeOfEntireBatch < this.maxAllowedPacket.getValue() - this.originalSql.length()) {
return numBatchedArgs;
}

return (int) Math.max(1, (this.maxAllowedPacket.getValue() - this.originalSql.length()) / maxSizeOfParameterSet);
}

计算方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 计算 maxSizeOfParameterSet: 每个参数中最大的长度
// sizeOfEntireBatch: batch所有参数长度相加
/**
* Computes the maximum parameter set size and the size of the entire batch given
* the number of arguments in the batch.
*/
@Override
protected long[] computeMaxParameterSetSizeAndBatchSize(int numBatchedArgs) {

long sizeOfEntireBatch = 1 + /* com_execute */+4 /* stmt id */ + 1 /* flags */ + 4 /* batch count padding */;
long maxSizeOfParameterSet = 0;

for (int i = 0; i < numBatchedArgs; i++) {
ServerPreparedQueryBindValue[] paramArg = ((ServerPreparedQueryBindings) this.batchedArgs.get(i)).getBindValues();

long sizeOfParameterSet = (this.parameterCount + 7) / 8; // for isNull
sizeOfParameterSet += this.parameterCount * 2; // have to send types

ServerPreparedQueryBindValue[] parameterBindings = this.queryBindings.getBindValues();
for (int j = 0; j < parameterBindings.length; j++) {
if (!paramArg[j].isNull()) {

long size = paramArg[j].getBoundLength();

if (paramArg[j].isStream()) {
if (size != -1) {
sizeOfParameterSet += size;
}
} else {
sizeOfParameterSet += size;
}
}
}

sizeOfEntireBatch += sizeOfParameterSet;

if (sizeOfParameterSet > maxSizeOfParameterSet) {
maxSizeOfParameterSet = sizeOfParameterSet;
}
}

return new long[] { maxSizeOfParameterSet, sizeOfEntireBatch };
}
  1. 拼接预编译sql,此时还有?号占位符
1
2
((Wrapper) locallyScopedConn.prepareStatement(generateMultiStatementForBatch(numValuesPerBatch)))
.unwrap(java.sql.PreparedStatement.class);

得到sql如下:

1
DELETE FROM t_order WHERE  `order_id` = ?;DELETE FROM t_order WHERE  `order_id` = ?;......
  1. 执行 prepareStatement,将上面的批量delete语句执行预编译。

这里逻辑和第一步里的 prepareStatement 类似,也需要判断当前批量delete sql是否支持服务端预编译。

这里可以看到,开启 allowMultiQueries 参数之后,驱动会查找 sql 里是否包含 ; 号,如果包含,不支持服务端预编译。后面会走客户端预编译流程。

  1. 客户端预编译执行批量delete

  1. 设置参数
1
2
// 给?号占位符塞值
batchedParamIndex = setOneBatchedParameterSet(batchedStatement, batchedParamIndex, this.query.getBatchedArgs().get(batchCounter++));
  1. 执行批量delete语句

2.useServerPrepStmts+rewriteBatchedStatements 参数

1.下发预编译sql

2.设置 multi statement on

3.多语句预编译sql,执行返回失败

You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ‘DELETE FROM t_order WHERE order_id = ?;DELETE FROM t_order WHERE order_id ‘ at line 1

代码就是服务端prepared返回失败,会再用 client Prepare statement 重试

4.client Prepare statement 重新批量delete 语句