您好,登录后才能下订单哦!
这篇文章主要介绍“TransmittableThreadLocal怎么解决池化复用线程的传值问题”,在日常操作中,相信很多人在TransmittableThreadLocal怎么解决池化复用线程的传值问题问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”TransmittableThreadLocal怎么解决池化复用线程的传值问题”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
一般情况下,ThreadLocal都可以满足我们的需求,当我们出现需要 在使用线程池等会池化复用线程的执行组件情况下传递ThreadLocal
,
这个场景就是TransmittableThreadLocal解决的问题。
Github地址:https://github.com/alibaba/transmittable-thread-local
感兴趣的可以去下载的玩一玩,接下来我们来介绍一下这个组件的神奇之处。
首先看个demo, 通过demo,我们先了解了解怎么用
/** * ttl测试 * * @author zhangyunhe * @date 2020-04-23 12:47 */ public class Test { // 1. 初始化一个TransmittableThreadLocal,这个是继承了InheritableThreadLocal的 static TransmittableThreadLocal<String> local = new TransmittableThreadLocal<>(); // 初始化一个长度为1的线程池 static ExecutorService poolExecutor = Executors.newFixedThreadPool(1); public static void main(String[] args) throws ExecutionException, InterruptedException { Test test = new Test(); test.test(); } private void test() throws ExecutionException, InterruptedException { // 设置初始值 local.set("天王老子"); //!!!! 注意:这个地方的Task是使用了TtlRunnable包装的 Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1"))); future.get(); Future future2 = poolExecutor.submit(TtlRunnable.get(new Task("任务2"))); future2.get(); System.out.println("父线程的值:"+local.get()); poolExecutor.shutdown(); } class Task implements Runnable{ String str; Task(String str){ this.str = str; } @Override public void run() { // 获取值 System.out.println(Thread.currentThread().getName()+":"+local.get()); // 重新设置一波 local.set(str); } } }
输出结果:
pool-1-thread-1:天王老子 pool-1-thread-1:天王老子 父线程的值:天王老子
我们首先看一下TransmittableThreadLocal
的源码,
public class TransmittableThreadLocal<T> extends InheritableThreadLocal<T> implements TtlCopier<T> { // 1. 此处的holder是他的主要设计点,后续在构建TtlRunnable private static InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() { @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); } @Override protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) { return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue); } }; @SuppressWarnings("unchecked") private void addThisToHolder() { if (!holder.get().containsKey(this)) { holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value. } } @Override public final T get() { T value = super.get(); if (disableIgnoreNullValueSemantics || null != value) addThisToHolder(); return value; } /** * see {@link InheritableThreadLocal#set} */ @Override public final void set(T value) { if (!disableIgnoreNullValueSemantics && null == value) { // may set null to remove value remove(); } else { super.set(value); addThisToHolder(); } } /** * see {@link InheritableThreadLocal#remove()} */ @Override public final void remove() { removeThisFromHolder(); super.remove(); } private void superRemove() { super.remove(); } }
步骤说明:
在代码中,作者构建了一个holder对象,这个对象是一个InheritableThreadLocal
, 里面的类型是一个弱引用的WeakHashMap , 这个map的va lu就是TransmittableThreadLocal
, 至于value永远都是空的
holder里面存储的是这个应用里面,所有关于TransmittableThreadLocal
的引用。
从上面可以看到,每次get, set ,remove都会操作holder对象,这样做的目的是为了保持TransmittableThreadLocal
所有的这个引用都在holder里面存一份。
回到我们上面的代码
Future future = poolExecutor.submit(TtlRunnable.get(new Task("任务1")));
细心的朋友可能已经发现了,我们调用了TtlRunnable
对象的get方法,下面看一下这个方法有啥作用吧
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) { if (null == runnable) return null; if (runnable instanceof TtlEnhanced) { // avoid redundant decoration, and ensure idempotency if (idempotent) return (TtlRunnable) runnable; else throw new IllegalStateException("Already TtlRunnable!"); } // 重点在这里 return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun); }
看上面的代码,细节上我们不看,我们看大致的思路, 这个地方主要就是根据传入的runnable构建了一个TtlRunnable对象。
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) { //重点在这里 this.capturedRef = new AtomicReference<Object>(capture()); this.runnable = runnable; this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun; }
下面这行代码,运行到这里的时候,还是在主线程里面,调用了capture
方法
this.capturedRef = new AtomicReference<Object>(capture());
public static Object capture() { // 构建一个临时对象,主要看captureTtlValues方法 return new Snapshot(captureTtlValues(), captureThreadLocalValues()); } private static WeakHashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() { // 构建一个WeakHashMap方法, WeakHashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); // 在主线程里面,调用holder变量,循环获取里面所有的key和value for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) { ttl2Value.put(threadLocal, threadLocal.copyValue()); } // 返回出去 return ttl2Value; }
步骤说明:
1.调用静态变量holder, 循环获取里面所有的key和value, value的获取就比较巧妙一点。
private T copyValue() { // 这里的get方法,调用的是父类的方法,可以在父类里面最终获取到当前TransmittableThreadLocal所对应的value return copy(get()); }
2.组装好一个WeakHashMap出去,最终就会到了我们上面的构造方法里面,针对capturedRef
的赋值操作。
@Override public void run() { //1. 获取到刚刚构造TtlRunnable对象的时候初始化的capturedRef对象。包含了从submit丢任务进来的时候父线程的数据 Object captured = capturedRef.get(); if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } // 清除不在captured里面的key,同时在这个子线程中,对所有的ThreadLocal进行重新设置值 Object backup = replay(captured); try { // 执行实际的线程方法 runnable.run(); } finally { // 做好还原工作,根据backup restore(backup); } } private static WeakHashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull WeakHashMap<TransmittableThreadLocal<Object>, Object> captured) { WeakHashMap<TransmittableThreadLocal<Object>, Object> backup = new WeakHashMap<TransmittableThreadLocal<Object>, Object>(); for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { TransmittableThreadLocal<Object> threadLocal = iterator.next(); // 做好当前线程的local备份 backup.put(threadLocal, threadLocal.get()); // 清除数据,不在captured里面的。 if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // 这里就是把值设置到当前线程的TransmittableThreadLocal里面。 setTtlValuesTo(captured); // 一个钩子 doExecuteCallback(true); return backup; }
到此,关于“TransmittableThreadLocal怎么解决池化复用线程的传值问题”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。