Quasar和Coroutine并不是OpenJdk社区原生的协程解决方案,直到2018年1月,官方提出了Project Loom,到了2019年,Loom的首个EA版本问世,此时Java的协程类叫做Fiber,但社区觉得这引入了一个新的概念,于是在2019年10月将Fiber重新实现为了Thread的子类VirtualThread,兼容Thread的所有操作。
这时Project Loom的基本雏形已经完成了,在它的概念中,协程就是一个特殊的线程,是线程的一个子类,从Project Loom已经可以看到Open Jdk社区未来协程发展的方向, 但Loom还有很多的工作需要完成,并没有完全开发完。
三. Project Loom的目标与挑战
目标
易于理解的Java协程系统解决方案,协程即线程。
Virtual threads are just threads that are scheduled by the Java virtual machine rather than the operating system.
挑战
兼容庞大而复杂的标准类库、JVM特性,同时支持协程和线程。
四. Loom实现架构在API层面Loom引入最重要的概念就是Virtual Thread,对于使用者来说可以当做Thread来理解。
下面是协程生命周期的描述,与线程相同需要一个start函数开始执行,接下来VirtualThread就会被调度执行,与线程不同的是,协程的上层需要一个调度器来调度它,而不是被操作系统直接调度,被调度执行后就是执行业务代码,此时我们业务代码可能会遇到一个数据库访问或者IO操作,这时当前协程就会被Park起来,与线程相同,此时我们的协程需要在切换前保存上下文,这步操作是由Runtime的Freeze来执行,等到IO操作完成,协程被唤醒继续执行,这时就要恢复上下文,这一步叫做Thaw。
1. Freeze操作上图左侧是对Freeze的介绍,首先一个协程要被执行需要一个调度器,在Java生态本身就有一个非常不错的调度器ForkJoinPool,Loom也默认使用ForkJoinPool来作为调度器。
图中ForkJoinWorkerThread调用栈前半部分直到enterSpecial都是类库的调用栈,用户不需要考虑,A可以理解为用户自己的实现,从函数A调用到函数B,函数B调用函数C,函数C此时有一个数据访问,就会将当前协程挂起,yield操作会去保存当前协程的执行上下文,调用freeze,freeze会做一个stack walk,从当前调用栈的最后一层(yield)回溯到用户调用(函数A),将这些内容拷贝到一个stack。这也是协程栈大小不固定的原因,我们可以动态扩缩协程需要的空间,而线程栈大小默认1M,不管用没用到。而协程按需使用的特点,可以创建的数量非常多。extract_pop是Loom非常好的一个优化,它将ABC调用栈中的Java对象单独拷贝到一个refStack,在GC root时,如果把协程栈也当做root,几百万个协程会导致扫描停顿很久,Loom将所有对象都提到一个refStack里面,只需要处理这个stack即可,避免过多的协程栈增加GC时间。
2. Thaw操作Thaw用于恢复执行,如果将stack里面ABC、yield全部拷贝回执行栈里面可能是很耗时的,因为执行栈可能非常深了,Loom社区成员在调研后发现,函数C可能不止一个数据访问操作,在恢复执行栈之后,可能因为C的IO操作又会再次切换上下文,所以Loom用了一种lazy copy的方式,每次只拷贝一部分,执行完成之后遇到return barrier则继续去stack中拷贝。这样除了第一次切换开销比较大,其他所有的切换开销都会很小。
另一方面refStack里面保存的OOP要restore回来,因为很多的GC可能在执行时将OOP地址改了,如果不restore之后访问可能会出现问题。
五. Loom使用
Virtual Thread创建
通过Thread.builder创建VirtualThread
通过Thread.builder创建VirtualThread工厂
默认ForkJoinPool调度器(负载均衡、自动扩展),支持定制调度器
定制调度器
static ExecutorService SCHEDULER_1 = Executors.newFixedThreadPool(1); Thread thread = Thread.ofVirtual().scheduler(SCHEDULER_1).start(() -> System.out.println("Hello")); thread.join();创建协程池
ThreadFactory factory; if (usrFiber == false) { factory = Thread.builder().factory(); } else { factory = Thread.builder().ofVirtual().factory(); } ExecutorService e = Executors.newFixThreadPool(threadCount, factory); for (int i=0; i < requestCount; i++) { e.execute(r); }