ShardingSphere inline表达式线程安全问题定位
问题背景
春节期间发现 ShardingSphere 事务 E2E 偶发执行失败问题,并且每次执行失败需要执行很久,直到超时。最终定位发现 inline 表达式存在线程安全问题。本文记录定位并解决 inline 表达式线程安全问题的过程。
问题原因
1.GroovyInlineExpressionParser
里有成员变量,存在并发修改,不能使用单例 SPI 实现;
2.执行 Groovy 表达式时,需要执行 rehydrate 方法 copy Closure,使得每个 Closure 都有独立的执行环境,避免属性赋值时产生线程安全问题。
问题定位
构造测试用例尝试复现问题
构造测试用例,且在测试用例中添加线程相关信息,观察执行结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Test @SneakyThrows({ExecutionException.class, InterruptedException.class}) void assertThreadSafety() { int threadCount = 2; ExecutorService pool = Executors.newFixedThreadPool(threadCount); List<Future<?>> futures = new ArrayList<>(threadCount); for (int i = 0; i < threadCount; i++) { Future<?> future = pool.submit(this::createInlineExpressionParseTask); futures.add(future); } for (Future<?> future : futures) { future.get(); } pool.shutdown(); }
private void createInlineExpressionParseTask() { for (int j = 0; j < 5; j++) { String resultSuffix = Thread.currentThread().getName() + "--" + j; String actual = TypedSPILoader.getService(InlineExpressionParser.class, "GROOVY", PropertiesBuilder.build( new PropertiesBuilder.Property(InlineExpressionParser.INLINE_EXPRESSION_KEY, "ds_${id%2}"))).evaluateWithArgs(Collections.singletonMap("id", 1)); assertThat(actual, is(String.format("ds_%s", resultSuffix))); String actual2 = TypedSPILoader.getService(InlineExpressionParser.class, "GROOVY", PropertiesBuilder.build( new PropertiesBuilder.Property(InlineExpressionParser.INLINE_EXPRESSION_KEY, "account_${id}"))).evaluateWithArgs(Collections.singletonMap("id", resultSuffix)); assertThat(actual2, is(String.format("account_%s", resultSuffix))); } }
|
通过修改线程数测试几组,结果如下:
1: 100个线程,一个线程里一个inline表达式,报错
2: 1个线程,一个线程里两个不同inline表达式,没问题
3: 2个线程,一个线程里两个不同inline表达式,报错
那么只需要最少两个线程并发执行即可稳定复现这个 bug。
对象属性线程间共享问题
初步看代码,GroovyInlineExpressionParser
之前是单例的,线程之间会共享同一个实例,GroovyInlineExpressionParser
类中发现有一处成员变量 inlineExpression
多个线程中共享,并发修改的话会导致数据不正确。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @SingletonSPI public interface InlineExpressionParser extends TypedSPI {}
public final class GroovyInlineExpressionParser implements InlineExpressionParser { private static final String INLINE_EXPRESSION_KEY = "inlineExpression"; private static final Map<String, Script> SCRIPTS = new ConcurrentHashMap<>(); private static final GroovyShell SHELL = new GroovyShell(); private String inlineExpression; @Override public void init(final Properties props) { inlineExpression = props.getProperty(INLINE_EXPRESSION_KEY); } }
|
这个问题修复起来很简单,移除 @SingletonSPI 声明,每次创建新的实例即可。
添加日志观察执行流程
上述问题修复后,继续执行测试用例,发现还是会报错。报错日志如下:
看起来就是当 thread2 获取结果之前, thread1 执行了 inline 表达式,thread2 拿到了 thread1 执行的结果。
通过日志也能看出来 thread2 执行报错时 [pool-1-thread-2] ERROR
拿到了线程1的执行结果result:ds_pool-1-thread-1--0
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 15:12:29.630 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:2086977125, closure:1142533146, map:{id=pool-1-thread-2--0} 15:12:29.689 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:0,result:ds_pool-1-thread-2--0 15:12:29.689 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:629624949, closure:595824174, map:{id=pool-1-thread-1--0} 15:12:29.689 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:0,result:ds_pool-1-thread-1--0 15:12:29.703 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:account_${id}, this:1499502307, closure:778755431, map:{id=pool-1-thread-2--0} 15:12:29.704 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - account_id, j:0,result:account_pool-1-thread-2--0 15:12:29.705 [pool-1-thread-2] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:1218528180, closure:667406814, map:{id=pool-1-thread-2--1} 15:12:29.704 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:account_${id}, this:2009787398, closure:1068736255, map:{id=pool-1-thread-1--0} 15:12:29.705 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - account_id, j:0,result:account_pool-1-thread-1--0 15:12:29.705 [pool-1-thread-2] ERROR org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:1,result:ds_pool-1-thread-1--0 15:12:29.705 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:ds_${id}, this:1072633959, closure:478686467, map:{id=pool-1-thread-1--1} 15:12:29.705 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - ds_id, j:1,result:ds_pool-1-thread-1--1 15:12:29.709 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParser - inlineExpression:account_${id}, this:2138009411, closure:1272220298, map:{id=pool-1-thread-1--1} 15:12:29.709 [pool-1-thread-1] INFO org.apache.shardingsphere.infra.expr.groovy.GroovyInlineExpressionParserTest - account_id, j:1,result:account_pool-1-thread-1--1
|
DEBUG模拟复现流程
通过 debug 控制两个线程执行流程,当 thread1 在 result.call().toString()
执行之前等待 thread2 执行完成后再返回时,thread1 得到的结果不正确。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public String evaluateWithArgs(final Map<String, Comparable<?>> map) { Closure<?> result = (Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}"); log.info("inlineExpression:{}, this:{}, closure:{}, map:{}", inlineExpression, System.identityHashCode(this), System.identityHashCode(result), map); result.rehydrate(new Expando(), null, null) .setResolveStrategy(Closure.DELEGATE_ONLY); map.forEach(result::setProperty); return result.call().toString(); }
|
结合后面的分析也能得出,因为线程之间共享了 context,上面 thread2 执行完把 context 里共享的属性给改了,导致 thread1 执行出现问题。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public String evaluateWithArgs(final Map<String, Comparable<?>> map) { Closure<?> result = (Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}"); result.rehydrate(new Expando(), null, null) .setResolveStrategy(Closure.DELEGATE_ONLY); map.forEach(result::setProperty); return result.call().toString(); }
Closure<?> result = ((Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}")).rehydrate(new Expando(), null, null);
|
问题分析
DEBUG分析问题
由于上面发现执行 evaluateWithArgs
方法会有线程安全问题,所以 DEBUG 分析里面的逻辑。
有问题的代码如下:
result.rehydrate(new Expando(), null, null)
方法作用:重新实例化一个闭包对象,设置 delegate 为 Expando。这样闭包的执行环境将改变,不再依赖于原始闭包的环境。
但是明显下面的代码里没有使用 rehydrate 返回的 copy 的 Closure 对象,最终导致 result::setProperty
设置的属性在线程之间共享了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Closure<?> result = ((Closure<?>) evaluate("{it -> \"" + handlePlaceHolder(inlineExpression) + "\"}"));
result.rehydrate(new Expando(), null, null); map.forEach(result::setProperty); return result.call().toString();
public Closure<V> rehydrate(Object delegate, Object owner, Object thisObject) { Closure<V> result = (Closure<V>) this.clone(); result.delegate = delegate; result.owner = owner; result.thisObject = thisObject; return result; }
|
result::setProperty
会调用 groovy.lang.Closure#setProperty
方法将用户传入的属性设置到 context 里,这个 context 是线程间共享的。
如果使用 rehydrate 返回的Closure对象,则属性会设置到各自的 Expando 对象里,不存在线程安全问题。
1 2 3 4 5
| groovy.lang.Closure#setProperty ->
InvokerHelper.setProperty(this.delegate, property, newValue);
|
context 初始化逻辑:
1 2 3 4 5 6 7
| private final Binding context;
public GroovyShell(ClassLoader parent, Binding binding, final CompilerConfiguration config) { this.context = binding; this.config = config; }
|
如果使用有问题的代码执行,执行结果如下,由于没有使用 rehydrate 执行 copy 后的 Closure,导致各个线程之间属性会串:
可以看到 thread1 和 thread7 都将属性设置到了共享的 context Bing@3954
对象里,所以线程之间属性会串,导致执行结果有问题。
Groovy表达式执行流程
Groovy 根据用户脚本,生成相关类代码,再编译成 Class 对象,然后创建闭包执行脚本。
调用api如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| private static final GroovyShell SHELL = new GroovyShell();
Script script = SHELL.parse(expression);
Closure<?> result = (Closure<?>) script.run();
result.rehydrate(new Expando(), null, null); result.setResolveStrategy(Closure.DELEGATE_ONLY);
map.forEach(result::setProperty);
return result.call().toString();
|
查看Groovy生成的类
可以通过 arthas 反编译,查看 groovy 生成的 Script 类的代码。其中 doCall 方法就是用来具体处理 inline 表达式计算结果的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| [arthas@97658]$ sc Script*
[arthas@92326]$ jad Script1$_run_closure1
ClassLoader: +-groovy.lang.GroovyClassLoader$InnerLoader@2960c0ad +-groovy.lang.GroovyClassLoader@22c53a48 +-jdk.internal.loader.ClassLoaders$AppClassLoader@251a69d7 +-jdk.internal.loader.ClassLoaders$PlatformClassLoader@76f18a45
Location: /groovy/shell
import groovy.lang.Closure; import groovy.lang.MetaClass; import java.lang.invoke.MethodHandles; import org.codehaus.groovy.reflection.ClassInfo; import org.codehaus.groovy.runtime.GStringImpl; import org.codehaus.groovy.runtime.GeneratedClosure; import org.codehaus.groovy.runtime.ScriptBytecodeAdapter; import org.codehaus.groovy.vmplugin.v8.IndyInterface;
public final class Script1._run_closure1 extends Closure implements GeneratedClosure { private static ClassInfo $staticClassInfo; public static transient boolean __$stMC;
public Script1._run_closure1(Object _outerInstance, Object _thisObject) { super(_outerInstance, _thisObject); }
public Object doCall(Object it) { return new GStringImpl(new Object[]{IndyInterface.bootstrap("getProperty", "id", 12, this)}, new String[]{"ds_", ""}); }
protected MetaClass $getStaticMetaClass() { if (this.getClass() != Script1._run_closure1.class) { return ScriptBytecodeAdapter.initMetaClass(this); } ClassInfo classInfo = $staticClassInfo; if (classInfo == null) { $staticClassInfo = classInfo = ClassInfo.getClassInfo(this.getClass()); } return classInfo.getMetaClass(); }
public MethodHandles.Lookup $getLookup() { return MethodHandles.lookup(); } }
Affect(row-cnt:1) cost in 468 ms.
|
当执行 inline 表达式,调用 Closure.call()
方法时,会调用到 GString 构造方法,values 是传入的参数值替换占位符,strings 是 inline 表达式去除占位符部分,最终返回计算后的结果。
PR
https://github.com/apache/shardingsphere/pull/30295