ShardingSphere inline表达式线程安全问题定位

问题背景

春节期间发现 ShardingSphere 事务 E2E 偶发执行失败问题,并且每次执行失败需要执行很久,直到超时。最终定位发现 inline 表达式存在线程安全问题。本文记录定位并解决 inline 表达式线程安全问题的过程。

问题原因

1.GroovyInlineExpressionParser 里有成员变量,存在并发修改,不能使用单例 SPI 实现;
2.执行 Groovy 表达式时,需要执行 rehydrate 方法 copy Closure,使得每个 Closure 都有独立的执行环境,避免属性赋值时产生线程安全问题。

问题定位

构造测试用例尝试复现问题

构造测试用例,且在测试用例中添加线程相关信息,观察执行结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Test
@SneakyThrows({ExecutionException.class, InterruptedException.class})
void assertThreadSafety() {
int threadCount = 2;
ExecutorService pool = Executors.newFixedThreadPool(threadCount);
List<Future<?>> futures = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
Future<?> future = pool.submit(this::createInlineExpressionParseTask);
futures.add(future);
}
for (Future<?> future : futures) {
future.get();
}
pool.shutdown();
}

private void createInlineExpressionParseTask() {
for (int j = 0; j < 5; j++) {
// 传入线程信息
String resultSuffix = Thread.currentThread().getName() + "--" + j;
String actual = TypedSPILoader.getService(InlineExpressionParser.class, "GROOVY", PropertiesBuilder.build(
new PropertiesBuilder.Property(InlineExpressionParser.INLINE_EXPRESSION_KEY, "ds_${id%2}"))).evaluateWithArgs(Collections.singletonMap("id", 1));
// 断言执行出来的结果是不是当前线程的
assertThat(actual, is(String.format("ds_%s", resultSuffix)));
String actual2 = TypedSPILoader.getService(InlineExpressionParser.class, "GROOVY", PropertiesBuilder.build(
new PropertiesBuilder.Property(InlineExpressionParser.INLINE_EXPRESSION_KEY, "account_${id}"))).evaluateWithArgs(Collections.singletonMap("id", resultSuffix));
assertThat(actual2, is(String.format("account_%s", resultSuffix)));
}
}

通过修改线程数测试几组,结果如下:
1: 100个线程,一个线程里一个inline表达式,报错
2: 1个线程,一个线程里两个不同inline表达式,没问题
3: 2个线程,一个线程里两个不同inline表达式,报错

那么只需要最少两个线程并发执行即可稳定复现这个 bug。

对象属性线程间共享问题

初步看代码,GroovyInlineExpressionParser 之前是单例的,线程之间会共享同一个实例,GroovyInlineExpressionParser 类中发现有一处成员变量 inlineExpression 多个线程中共享,并发修改的话会导致数据不正确。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// InlineExpressionParser 的 spi 之前是单例的,所有线程对于同一个类型的 spi 实现,共享同一个对象。
@SingletonSPI
public interface InlineExpressionParser extends TypedSPI {}

// GroovyInlineExpressionParser 实现
public final class GroovyInlineExpressionParser implements InlineExpressionParser {

private static final String INLINE_EXPRESSION_KEY = "inlineExpression";

private static final Map<String, Script> SCRIPTS = new ConcurrentHashMap<>();

private static final GroovyShell SHELL = new GroovyShell();

// 可以看到此处是成员变量,多个线程之间会串。
private String inlineExpression;

@Override
public void init(final Properties props) {
inlineExpression = props.getProperty(INLINE_EXPRESSION_KEY);
}
}

这个问题修复起来很简单,移除 @SingletonSPI 声明,每次创建新的实例即可。

添加日志观察执行流程

上述问题修复后,继续执行测试用例,发现还是会报错。报错日志如下:

看起来就是当 thread2 获取结果之前, thread1 执行了 inline 表达式,thread2 拿到了 thread1 执行的结果。
通过日志也能看出来 thread2 执行报错时 [pool-1-thread-2] ERROR 拿到了线程1的执行结果result:ds_pool-1-thread-1--0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15:12:29.630 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:2086977125, closure:1142533146, map:{id=pool-1-thread-2--0}
15:12:29.689 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:0,result:ds_pool-1-thread-2--0
15:12:29.689 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:629624949, closure:595824174, map:{id=pool-1-thread-1--0}
15:12:29.689 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:0,result:ds_pool-1-thread-1--0
15:12:29.703 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:account_${id}, this:1499502307, closure:778755431, map:{id=pool-1-thread-2--0}
15:12:29.704 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - account_id, j:0,result:account_pool-1-thread-2--0
15:12:29.705 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:1218528180, closure:667406814, map:{id=pool-1-thread-2--1}
15:12:29.704 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:account_${id}, this:2009787398, closure:1068736255, map:{id=pool-1-thread-1--0}
15:12:29.705 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - account_id, j:0,result:account_pool-1-thread-1--0
15:12:29.705 [pool-1-thread-2] ERROR org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:1,result:ds_pool-1-thread-1--0
15:12:29.705 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:1072633959, closure:478686467, map:{id=pool-1-thread-1--1}
15:12:29.705 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:1,result:ds_pool-1-thread-1--1
15:12:29.709 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:account_${id}, this:2138009411, closure:1272220298, map:{id=pool-1-thread-1--1}
15:12:29.709 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - account_id, j:1,result:account_pool-1-thread-1--1

DEBUG模拟复现流程

通过 debug 控制两个线程执行流程,当 thread1 在 result.call().toString() 执行之前等待 thread2 执行完成后再返回时,thread1 得到的结果不正确。

1
2
3
4
5
6
7
8
9
10
11
12
13
public String evaluateWithArgs(final Map<String, Comparable<?>> map) {
// 1.thread1执行
// 3.thread2执行
Closure<?> result = (Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}");
log.info("inlineExpression:{}, this:{}, closure:{}, map:{}", inlineExpression, System.identityHashCode(this), System.identityHashCode(result), map);
result.rehydrate(new Expando(), null, null)
.setResolveStrategy(Closure.DELEGATE_ONLY);
map.forEach(result::setProperty);
// 2.thread1在此等待
// 5.thread1执行完毕返回结果,观察到结果不对
return result.call().toString();
// 4.thread2执行完毕返回结果
}

结合后面的分析也能得出,因为线程之间共享了 context,上面 thread2 执行完把 context 里共享的属性给改了,导致 thread1 执行出现问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 原有有问题的代码,只调用了 rehydrate 方法,但是没有获取 rehydrate 方法里 clone 后的结果。所有线程之间共享一个执行环境,导致属性会串。
public String evaluateWithArgs(final Map<String, Comparable<?>> map) {
Closure<?> result = (Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}");
// 这里会 copy Closure,返回新的执行环境,但是结果被忽略了
result.rehydrate(new Expando(), null, null)
.setResolveStrategy(Closure.DELEGATE_ONLY);
// 所以这里设置属性时,共用的同一个 context, 线程之间会串
map.forEach(result::setProperty);
return result.call().toString();
}

// 修改后的代码如下:获取 copy 的 Closure,那么每个 Closure 使用单独的执行环境。
Closure<?> result = ((Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}")).rehydrate(new Expando(), null, null);

问题分析

DEBUG分析问题

由于上面发现执行 evaluateWithArgs 方法会有线程安全问题,所以 DEBUG 分析里面的逻辑。

有问题的代码如下:
result.rehydrate(new Expando(), null, null) 方法作用:重新实例化一个闭包对象,设置 delegate 为 Expando。这样闭包的执行环境将改变,不再依赖于原始闭包的环境。
但是明显下面的代码里没有使用 rehydrate 返回的 copy 的 Closure 对象,最终导致 result::setProperty 设置的属性在线程之间共享了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Closure<?> result = ((Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}"));
// 有问题的调用代码,没有获取 rehydrate() copy后的 Closure,导致线程之间属性串了。
result.rehydrate(new Expando(), null, null);
map.forEach(result::setProperty);
return result.call().toString();

// Closure#rehydrate 方法逻辑如下:
public Closure<V> rehydrate(Object delegate, Object owner, Object thisObject) {
// clone 当前 Closure
Closure<V> result = (Closure<V>) this.clone();
// 设置传入的 Expando
result.delegate = delegate;
result.owner = owner;
result.thisObject = thisObject;
return result;
}

result::setProperty 会调用 groovy.lang.Closure#setProperty 方法将用户传入的属性设置到 context 里,这个 context 是线程间共享的。
如果使用 rehydrate 返回的Closure对象,则属性会设置到各自的 Expando 对象里,不存在线程安全问题。

1
2
3
4
5
// 设置属性
groovy.lang.Closure#setProperty
->
// 其中 delegate 就是上面的 Expando 对象。如果没设置,默认使用同一个 context
InvokerHelper.setProperty(this.delegate, property, newValue);

context 初始化逻辑:

1
2
3
4
5
6
7
// 默认的 context 是在 GroovyShell 构造方法中初始化的,由于复用了同一个 GroovyShell,所以 context 是一份。
private final Binding context;

public GroovyShell(ClassLoader parent, Binding binding, final CompilerConfiguration config) {
this.context = binding;
this.config = config;
}

如果使用有问题的代码执行,执行结果如下,由于没有使用 rehydrate 执行 copy 后的 Closure,导致各个线程之间属性会串:
可以看到 thread1 和 thread7 都将属性设置到了共享的 context Bing@3954 对象里,所以线程之间属性会串,导致执行结果有问题。

Groovy表达式执行流程

Groovy 根据用户脚本,生成相关类代码,再编译成 Class 对象,然后创建闭包执行脚本。

调用api如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final GroovyShell SHELL = new GroovyShell();

// 1.解析 groovy script
// 生成scriptName;通过 groovy classloader 编译类,返回Class实例;通过Class反射创建Script对象返回
Script script = SHELL.parse(expression);

// 2.运行script。会调用到 会调用 Closure.run -> Closure.call -> 自定义script类的doCall方法
Closure<?> result = (Closure<?>) script.run();

// 3.重新实例化一个闭包对象,设置 delegate 为 Expando。这样闭包的执行环境将改变,不再依赖于原始闭包的环境
result.rehydrate(new Expando(), null, null);
result.setResolveStrategy(Closure.DELEGATE_ONLY);

// 4.设置参数
map.forEach(result::setProperty);

// 5.运行闭包
return result.call().toString();

查看Groovy生成的类

可以通过 arthas 反编译,查看 groovy 生成的 Script 类的代码。其中 doCall 方法就是用来具体处理 inline 表达式计算结果的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
[arthas@97658]$ sc Script*

[arthas@92326]$ jad Script1$_run_closure1

ClassLoader:
+-groovy.lang.GroovyClassLoader$InnerLoader@2960c0ad
+-groovy.lang.GroovyClassLoader@22c53a48
+-jdk.internal.loader.ClassLoaders$AppClassLoader@251a69d7
+-jdk.internal.loader.ClassLoaders$PlatformClassLoader@76f18a45

Location:
/groovy/shell

/*
* Decompiled with CFR.
*/
import groovy.lang.Closure;
import groovy.lang.MetaClass;
import java.lang.invoke.MethodHandles;
import org.codehaus.groovy.reflection.ClassInfo;
import org.codehaus.groovy.runtime.GStringImpl;
import org.codehaus.groovy.runtime.GeneratedClosure;
import org.codehaus.groovy.runtime.ScriptBytecodeAdapter;
import org.codehaus.groovy.vmplugin.v8.IndyInterface;

public final class Script1._run_closure1
extends Closure
implements GeneratedClosure {
private static /* synthetic */ ClassInfo $staticClassInfo;
public static transient /* synthetic */ boolean __$stMC;

public Script1._run_closure1(Object _outerInstance, Object _thisObject) {
super(_outerInstance, _thisObject);
}

// call时会调用 doCall 方法
public Object doCall(Object it) {
return new GStringImpl(new Object[]{IndyInterface.bootstrap("getProperty", "id", 12, this)}, new String[]{"ds_", ""});
}

protected /* synthetic */ MetaClass $getStaticMetaClass() {
if (this.getClass() != Script1._run_closure1.class) {
return ScriptBytecodeAdapter.initMetaClass(this);
}
ClassInfo classInfo = $staticClassInfo;
if (classInfo == null) {
$staticClassInfo = classInfo = ClassInfo.getClassInfo(this.getClass());
}
return classInfo.getMetaClass();
}

public /* synthetic */ MethodHandles.Lookup $getLookup() {
return MethodHandles.lookup();
}
}

Affect(row-cnt:1) cost in 468 ms.

当执行 inline 表达式,调用 Closure.call() 方法时,会调用到 GString 构造方法,values 是传入的参数值替换占位符,strings 是 inline 表达式去除占位符部分,最终返回计算后的结果。

PR

https://github.com/apache/shardingsphere/pull/30295