DAG设计
需求
DAG将执行流程构建一个单向无环图,其中每个点表示一个逻辑执行单元,有向边表示逻辑单元的依赖关系,其中入节点依赖出节点的执行结果。对于个人节点来说,所有执行其的节点(对该节点来说是入节点)执行完成后立即执行该节点。
对于无依赖关系可以同步执行的节点来说,使用线程池来统一并发执行,通过图编排尽可能的保证执行的高效性。
同时由于存再依赖关系,所已需要考虑某个节点其执行错误时,依赖其结果的所有节点处理逻辑,folly中使用的方式是当任意一个节点失败时,依赖其结果的节点都停止执行。
设计
对于所有依赖节点执行完成才能执行当前节点的实现首先想到的是屏障或者条件变量,但是folly都未使用,而是使用了更为优雅的方式:智能指针(原子变量),通过依赖的节点每个持有一个智能指针的拷贝,当依赖节点执行完成时会释放该智能指针,当所以智能指针都被释放时将执行析构函数,在析构函数中将执行当前节点(并非立即执行,而是放到线程池中)。
对于节点并发执行来说,每个节点可指定使用的线程池,当某个节点未指定使用的线程池将使用其依赖节点的线程池(依赖的最后执行完成的那个节点)。
在执行开始和完成都有两个单独节点,其中开始节点是第一层节点的依赖,由于设置触发,对于最后一个节点则是依赖所有的最后一层节点,用于监控所有节点执行完成。
使用样例
待补充,可看folly源码中test部分。
源码代码解析
这里对folly的DAG源码进行详细阅读。要学习DAG,首先要学习folly的future,其类似std提供的future,但是更加灵活。future的基类是core,我们先来了解一下core相关内容。
core
core对应于文件folly/futures/detail/Core.h其包含了如下内容State,Spin Lock,DeferredExecutor,KeepAliveOrDeferred,InterruptHandler,CoreBase,Core。
其中core的核心是一个FSM即有限状态机。其状态流转如下图:
1 | /// +----------------------------------------------------------------+ |
本文介绍中暂不考虑setProxy()这条支路。
这里需要对一些名称进行一个定义:
- result:即结果,一个Try类型数据,包含一个T类型数据或者一个exception。对于链式控制来说(使用then)上一个的结果会作为下一个的输出,当设置当前的result即表示上一个执行单元完成处理,对于当前执行单元来说,执行完成的结果会被设置到下一个执行单元,以此来触发下一个执行单元的执行,以此达到链式的控制及数据传递。
- callback:回调函数。当设置了result时会执行函数并为下一个执行单元设置result。
- executor:执行器。执行callback的结构,也会作为参数传递到callback中,可以暂时理解为线程池。
- consumer thread:消费线程,这里的消费线程时指消费整个future执行结果的线程(而不是消费任务的线程),其提供
callback。 - producer thread:生产者线程,这里指生成结果的线程(而不是生成任务的线程)。
interrupt:exception_wrapper,表示异常结果。及try机制。- interrupt handler:出现异常时的执行函数。
core持有三组数据,每组数据都是并发控制的:
- producer-to-consumer(生产者到消费者)的信息流:这组数据包含:
result,callback,executor以及运行callback的·优先级。控制并发的方式是使用上图的FSM,其中有State表示状态,状态是原子的进行变更的。 - consumer-to-producer(消费者到生产者)的请求干预:包含
interrupt-handler和interrupt。通过Spin Lock控制并发。 - 生命周期控制:包含了两个引用计数,均为原子变量。
这里为了区分生产者和消费者,并且为了方便使用,有两个结构共同维护该FSM(共用同一个core),分别是future和promise。其中future一般由消费者线程持有,消费者可以注册callback。promise一般由生产者线程持有,其接受上一层的result,执行callback,设置下一层的result。消费者持有的一般是下一层的future,当消费者判断持有的future已经设置了result则表示设置的callback已经执行完成。对于多层级执行链来说,处于中间的future和promise对用户来说均是不可见的,我们只能获取到最起点的future和promise,以及最后一个future。对于中间的future来说,其执行完成链路的构建就会被析构掉(由于不会被任何变量持有,生命周期结束),对于promise来说,会被生产者线程持有(准确的来说,其实不是被生产者线程持有,因为此时callback未被添加到生产者线程池中,其实际是被每个的上一层的callback持有),当最初的promise设置了result后,就会按照执行链路执行对应的callback。当每个callback执行完成时,会先设置下一个promise的result(此时就会触发下一层callabck执行),之后会执行自身callback的析构,此时就会析构callback持有的下一层的promise(在下一层开始执行callback时,会自己保证在执行完成callback前自生不被析构,因此不用担心此时promise被析构,同样在其执行完成callback再执行析构),通过这种方式达到链式执行效果。
上面介绍的可能相对较为复杂,很多概念还未提到,暂时可以先有一个大致的概念,不用现在就完全搞明白,后面会详细介绍。
为了实现上述功能,需要有很多基础的class,下面对这些class进行简单介绍。
State
State相对较为简单,一个枚举类型来表示状态:
1 | enum class State : uint8_t { |
这里不过多介绍。
SpinLock
SpinLock就是一个自己实现的自旋锁,这里可以将其当做一个简单的锁就好了:
1 | /// SpinLock is and must stay a 1-byte object because of how Core is laid out. |
DeferredExecutor
DeferredExecutor延迟执行器。其主要功能是持有一个函数,但不立即执行,等待时机成熟(达到某种条件时才执行)。
该class使用场景似乎很特殊,目前没有看到应用场景,读者可暂时忽略该部分(或者有大佬知道怎么用,辛苦不吝赐教)。
其定义如下:
1 | class DeferredExecutor final { |
其中Executor::KeepAlive就是folly自己实现的对Executor封装的安全指针:
1 | class ExecutorKeepAliveBase { |
KeepAlive利用指针最后两位一定是0的特性,使用最好两位来做标识。kDummyFlag表示假的keep-alive,kAliasFlag标识当前的Executor是一个别名·。
KeepAlive封装的ExecutorT,正常都需要继承Executor,其提供了三个接口:keepAliveAcquire和keepAliveRelease,在两个接口用于提供引用计数计算的,默认返回值为false,表示不支持引用计数,此时的KeepAlive是一个假的支持keep-alive,此时执行copy操作时,直接返回ExecutorT的地址(加上kDummyFlag),如果继承Executor的类实现了上述两个接口,则copy返回指针(加上kAliasFlag)的同时,会增加引用计数。另一个接口时线程池都需要有的接口add,即向其中添加task任务,默认的函数时立即执行函数。
KeepAliveOrDeferred
KeepAliveOrDeferred是一个折叠类型,包含一个Executor::KeepAlive或者DeferredWrapper。
1 | /** |
这里我们主要使用Executor::KeepAlive<>类型,可不考虑DeferredExecutor。
由于KeepAlive存再隐式构造函数:
1 | /* implicit */ KeepAlive(ExecutorT* executor) { |
因此对于使用的时候,如果一个函数参数要求是KeepAlive,但是传递的是一个Executor的指针时,默认都会转成KeepAlive。
而KeepAliveOrDeferred中又存再
1 | /* implicit */ KeepAliveOrDeferred(KA ka) noexcept; |
以KeepAlive作为参数的的隐式构造函数,因此当使用Executor的指针作为参数时,往往使用的是KeepAliveOrDeferred中的KeepAlive模式。这也是我前文所述主要使用的是KeepAlive的原因,后面可以看到在feature中参数均为KeepAlive,使用时经常会传递线程池指针,这种情况下,都是使用的上述内容。
InterruptHandler&InterruptHandlerImpl
这两个类提供了异常处理机制,都比较简单,这里不做详细介绍。主要包括三部分,引用计数,异常处理函数,异常类(folly::exception_wrapper),其中folly::exception_wrapper相对复杂,这里不展开介绍。
1 | class InterruptHandler { |
CoreBase
CoreBase是核心类,其维护了之前介绍了数据流转和FSM。
1 | class CoreBase { |
setExecutor使用参数是KeepAliveOrDeferred,也就是在上面介绍的,实际往往都是keepalive。
大部分情况下使用corebase时,会先提前设置一个Executor和设置callback,当设置Result时往往是链式请求中的上一个future,这时会将上一个执行的线程池传递过来,如果之前已经设置的线程池和传递的线程池不一致,则使用旧版的线程池(优先用户设置的,传过来的是架构透传的,后面讲到future的then会详细介绍)。
对于执行方式有两种类型,一个是立即执行,另一种是放到线程池中等待调度执行。当setResult_(如果已经设置了callback,将调用doCallback)时,之前未设置线程池,则认为是在原线程池中立即执行,所以直接执行callback。当此前已经设置了线程池时,则可以通过state是OnlyCallbackAllowInline还是OnlyCallback,对于前者来说,表示立即执行callback,对后者来说,则表示将callback添加到线程池中,等待线程池调度。
RequestContext:其中context_是线程维度数据,其实现为一个线程维度单例(使用static thread local),用于传递一些线程数据。一个比较常见使用场景是,对于一个线程执行的任务,向该结构内增加数据,当出问题,确定是哪个请求导致的。
有了上面介绍,我们来详细看一下doCallback函数实现doCallback“:
1 | // May be called at most once. |
传参中completingKA有两种情况:
- 如果是先设置的result,再设置的callback,则是新建的一个新的。
- 如果是先设置的callback,再设置的result,则大部分情况是上一个task任务执行时所使用的线程池。
其中doAdd是实际执行callback的函数,参数中addCompletingKA是新传入的线程池,currentExecutor是旧版线程池,keepAliveFunc是又进过一层封装的callback。
1 | auto doAdd = [](Executor::KeepAlive<>&& addCompletingKA, |
首先判断原线程池是否为defer线程池,这里一般不是,暂时不考虑。
对于均为keepalive线程池来说,判断新旧线程池是否一致,如果一致,则立即执行callback,否则将callback添加到线程池中。
如果之前存再线程池,执行逻辑如下:
1 | if (!(priorState == State::OnlyCallbackAllowInline)) { |
执行方式:如果不允许立即执行函数(即非OnlyCallbackAllowInline),则让新线程池变成一个空的线程池,这样保证不会和之前线程池一致,在doAdd时就不会立即执行callback了。
在介绍下面代码之前,先看一下CoreAndCallbackReference实现:
1 | class CoreBase::CoreAndCallbackReference { |
其实现相当简单,就是存储core,并且负责管理coar自身生命周期以及callback和context生命周期。利用变量作用域结束后调用析构函数保证执行core的derefCallback和detachOne()。
下面再来看接下来的四行代码:
1 | // We need to reset `callback_` after it was executed (which can happen |
其中注释给了详细介绍。我们要在callback_执行结束时reset(这里应该是指要执行析构函数),doAdd执行后有可能会丢弃callback_,即使这种情况下,我们依然需要reset callback,因此需要有两个CoreAndCallbackReference来维护callback_的生命周期。而执行keepAliveFunc需要core,因此core也不能在执行doAdd(有可能只是将keepAliveFunc加到了线程池中而未执行)完成后就被析构,因此对core的生命周期监控也需要两个CoreAndCallbackReference,因此这里将core和callback的引用计数都先加2,使用两个CoreAndCallbackReference来对core和callback进行生命周期监控。
这里解释了此前说的,在执行callback执行时会自己维护FSM,保证自己不被析构(在执行callback时,future和promise可能都已经被析构了,当callback执行完成后core被析构)。
异常处理:如果执行doAdd抛出异常,则使用该异常执行callback,因此callback应该有对异常处理的能力。目前异常处理不需要通过函数中判断参数是否存再异常,这一步由future架构实现了,目前实现异常处理可以通过thenError来实现,这部分在future中将详细介绍。
当此前未设置线程池时,其处理相对简单,先注册执行完成的析构方法,用于析构callback_,而后立即执行callback。
这里关于core的生命周期还有两个重要函数:
1 | /// Called by a destructing Future (in the consumer thread, by definition). |
上面说过,维护FSM有两个结构,future和promise,detachFuture和detachPromise分别对应于两个结构的析构函数。由于future析构时,可能什么也未操作(只设置了callback),因此可以直接尝试执行析构即可。而析构promise时,正常来说已经设置了result(上一层的promise设置的这一层的result),因此这里先做了一个检查,如果未设置,打日志。
后面可以看到,正常创建一个空的core时,其引用计数就是2,表示会有两个结构持有,分别执行析构函数,最终来析构core。
ResultHolder
ResultHolder用处存储result。其定义如下:
1 | template <typename T> |
其中Try存储一个T类型原始,或者一个expection或者未空,其中有枚举值标记其类型。
core
core继承了corebase和resultholder,组成了FSM的原始。其定义如下:
1 | template <typename T> |
core相较corebase增加内容不多,主要看一下其setCallback,其主要对fun进行了一层封装,将对异常的处理进行了封装:
1 | template <class F> |
当传递的异常不为空时,表示出现了异常,这时将会把异常添加到result中,此前说过异常处理由folly架构完成,这里是其中实现的一部分。更多的部分则在future中介绍。这里可以回头看一下doCallback在遇到异常时的处理,会将异常作为参数传递到callback中执行。
小结
至此基本介绍完成了core的基本信息,对folly的异步框架的FSM进行了深入的了解,同时对future的链式执行有了初步认识。接下来会更加细致的介绍folly链式执行的实现原理及DAG的实现。
future&promise
由于future&promise通过了大量接口,全部介绍比较啰嗦(很多很简单的接口),这里只介绍一些核心接口。
如上文所说folly&promise沟通维护一个core结构,共同维护FSM。promise一般会由生产者线程持有,负载生产result,future有消费者线程持有(用户线程),负责添加callback并且控制执行流程,以及最终结果获取。
promise持有core(成员变量),并且可以通过promise获取到future。
正常异步执行流程为
异步执行流程如下
- 创建一个promise。
- 从promise中获取一个future,并将future给消费者线程。
- 用户向future中添加callback(同时可设置executor)。添加callback后会新建一个promise,并通过新的promise获取一个新的future。其中promise自生被生产者线程持有(上一层的callback持有,future的then将详细介绍)。future被返回给消费者线程。
- 如果返回的future也设置了callback,即链式设置callback,则继续执行第三步,直到不再添加callback为止。
- 用户设置最最初的promise的result,开启链式执行(这一步也可能是在第一个执行,这时创建的一般是future)。
- 链式执行中,当上一层执行了callback后,设置当前层promise的result,执行当前层级的callback,设置下一层的result。析构当前的promise。
- 如果没有下一层了,则用户通过future获取到结构。如果依然有下一层,则当前层的future已经被析构(构建完成多次执行链就已经被析构了),重复执行第七步。
- 通过最后一个返回的future,调用阻塞方法,等待所有callback执行完成(或出错)。
对于一个简单代码实例如下:
1 |
|
下面我们就按照上面的步骤,进行介绍。
创建promise
一般创建promise使用默认构造函数,其定义如下:
1 | template <class T> |
其中retrieved由于记录是否已经从当前的promise中获取到了future,该参数用于析构时判断。core_是core的引用。其中使用make()创建的core将有两个引用计数。
由于retrieved_用于析构,我们先来看一下Promise的析构函数:
1 | template <class T> |
这里,首先会判断是否存再core,之后,如果retrieved_未false,及还未从promise获取到future,则会执行对future的析构,否则仅执行promise析构(之间有一个处理异常的处理)。
这里主要作用是保证core被完整析构,如果还未创建future在,则future的引用计数也有promise释放,如果已经创建了future,则future的引用计数则有future自生释放。
这里我们再看一下future的析构函数:
1 | template <class T> |
future继承自FutureBase。这里可以看到,future析构时会释放掉core的future的引用计数。
上面说的启动方式,是callback等待result,还有一种方式是result等待callback。这是创建方式一般是:
1 | future::makeFuture(T); |
其中makeFeature()等价调用makeFeature(Unit{}),Unit可以理解为void:
1 | /// In functional programming, the degenerate case is often called "unit". In |
主要用于对void的封装。
其中makeFuture(t)实现为:
1 | template <class T> |
这里由于已经设置了result,因此不需要promise,因此引用计数是1。
从promise中获取future
promise可以获取future,有两种future,一个是SemiFuture,另一个是future。其中二者区别主要是在是否设置了executor。SemiFuture未设置executor,SemiFuture通过via设置executor后就变成了future。设置executor含义是指定callback在那个线程池中执行。
对应的两个方法为:
1 | template <class T> |
首先会设置retrieved_为true,标识已经生成了future,避免重复析构。其中如果返回future,则会使用folly提供的默认executor。一般我们返回SemiFuture,由用户自定义executor。
添加callback
添加callback主要有四个方法,分别是thenValue,thenTry,thenValueInline,thenTryInline。其中Value和try主要区别是callback参数不同,value是普通类似,try是对value的封装,同时有可能是还有异常。Inline和非inline区别时设置callback时是允许立即执行,还是放到线程池中执行,可参考执行方式。
其中四个函数实现如下:
1 | template <class T> |
首先可以看到是否有inline,仅在调用thenImplementation时最后一个参数有diff。
对应thenTry来说,其封装的lambdaFunc直接执行callback,而对于thenValue来说,则是使用futures::detail::wrapInvoke又进行了一层封装。这是因为,在future框架代码执行过程中,数据交换都是通过try进行传递的,如果callback参数是try,则可以直接使用,如果参数是value,则需要从try中提取出value(如果有的话)。这也导致了异常处理的差异,对于使用thenTry来说,需要在处理函数内部判断是否有异常,并对异常进行处理,如果使用thenValue,则在出现异常时,不会调用对应的callback,当用户设置了thenError时,根据thenError匹配的错误类型进行执行对应的callback(这部分后面会详细介绍)。
我们看一下futures::detail::wrapInvoke的实现:
1 | template <typename T, typename F> |
fn执行callback函数,参数为try中的value。
首先判断try中是是否有异常,如果有直接返回异常,如果没有,则返回fn执行结果。其中返回类型是callback的返回结果类型。可以大致看一下InvokeResultWrapper实现:
1 | template <typename T> |
对应返回类型为void,及callback未设置返回的来说,利用模板的偏特化,返回的类型是Try<Unit>。
1 | template <> |
在上述四个then方法中构建的lambdaFunc函数中,传递了参数Executor::KeepAlive<>&&,但是并未使用,这是由于除了上述方法,还有三个方法:
1 | template <typename T> |
其中thenExTry和thenExTryInline传递的callback会将Executor::KeepAlive<>&&作为参数传递,这三个方法均不常使用,这里不做过多介绍。
其中R是作为callback的辅助数据。这里不展开介绍。
对于thenImplementation来说,根据callback返回值类型不同,有两个重载:
- 当返回普通类型时,执行callback返回结果时,直接设置下一层
promise的result。 - 当返回类型为
future时,执行完callback,替换下一层的promise的core到callback返回的future的core,同时将下一层的future中设置的callback传递给这一层返回的future,这样相当于在执行时,对返回的future进行了替换,替换为callback返回的future。
下面针对上面两个类型的callback详细介绍。
callback返回try
callback返回try类型的逻辑相对简单一些,代码如下:
1 | // Variant: returns a value |
首先会新建一个Promise,作为下一层的FSM,其拷贝本层的异常处理函数。从新建的promise获取到一个future作为返回结果。其默认使用的执行器与本层的执行器一致。
setCallback_对传递的callback又进行了一次封装。其中futures::detail::makeCoreCallbackState创建的结构为CoreCallbackState,其用于控制callback执行,其实现如下:
1 | // Guarantees that the stored functor is destructed before the stored promise |
其持有一次func和一个promise,其中func是本层的callback,而promise是下一层的promise。
callback函数使用move方法拷贝到CoreCallbackState中,因此其负责管理callback的生命周期。调用stealPromise()方法就会析构callback。
Invoke方法用于实际执行callback,setTry和setException负责将callback结果传递到promise(下一层的FMS)。当设置的promise时,promise的isFulfilled()方法将会返回ture,通过该方式来控制析构CoreCallbackState不会double free callback。
下面来分析一下封装的callback:
1 | [state = futures::detail::makeCoreCallbackState( |
callback参数为Executor::KeepAlive<>和Try<T>,其中前者用于传递执行器,后者是上一层的promise执行返回的结果。
异常传递首先判断上一层结果是否方式异常,如果存再异常,则不在执行本层的callback,设置下一层结果为异常,这样如果下一层依然是这样的处理逻辑,则会一直往下一层设置结果为异常,直到设置到异常处理的callback,或者到最有一层,返回给用于。
如果未出现异常,则执行本层的callback:
1 | template <typename R, typename State, typename T, IfArgsSizeIs<R, 2> = 0> |
并将结果赋值到下一层的promise,这里设置的都是Try类型数据,但是由于上一层的封装中(thenTry,thenValue)会对类型进行一次转换,保证传递的参数是下一层·callback需要的参数·。
如果本层callback执行结果出现异常,则下一层如果是then类型的处理,则就会像上面介绍的一样进行异常传递。如果下一层就是异常处理函数(thenError)则立即就执行异常处理了(相对于跳过的异常传递的传递部分)。
如果本层callback执行结果正常,则会在设置下一层promise时,立即开始执行下一层的callback,之后依次链式执行。
这里再来看一下setCallback_内容,其对传递的callback又进行了一层封装(无线套娃了属于是):
1 | template <class T> |
这里封装较为简单,仅仅只是增加了一个RequestContext,这部分可以结合RequestContext阅读。
其中RequestContext是一个folly提供的线程维度全局单例(thread local)。其作用就是在线程之间传递数据。实现原理较为简单。当请求到来时,我们可以在请求处理的主线程中设置RequestContext,向其中添加数据。当我们在主线程之外使用别的线程池的时候,folly架构通过RequestContext::saveContext()方法获取到主线程的RequestContext数据。将该数据作为参数传递到异步线程要执行的函数中,这样在异步线程执行函数之前,通过RequestContextScopeGuard类,来将主线程的RequestContext数据拷贝到当前线程中,同时将自己的RequestContext存储起来,在RequestContextScopeGuard析构时,使用存储的RequestContext恢复自生的线程数据。以此导致数据屏蔽用户进行传递的作用。可以参考doCallback中doAdd传递的func,其在执行callback时会先创建RequestContextScopeGuard,并将这里传递的RequestContext::saveContext()作为参数。
该数据的一个典型使用场景是日志打点,对于某个大型服务来说,经常出现coredump,但是这些数据往往是由于实验导致的(毕竟如果上线就有core就上不去了),这时我们可能需要有一个添加方式来确定是哪个实验导致的问题,这个时候就可以使用RequestContext。我们在请求构建时,将实验参数写到RequestContext中,这样就会在请求使用到的所有线程中都可以获取到该数据,当出现core时,使用core的信号处理还是将实验参数打印出来。这样就不需要在每次使用异步线程的时候手动传递该值了。
callback返回future
对于callback返回future的场景来说,较为复杂,其实现逻辑就是,在构建时返回的future将会被运行时返回的future隐式替换,这样原本等待返回的future被设置result变成了等待执行时返回的future被设置result,其实现通过此前未介绍的Proxy。下面来看一下具体代码:
1 | // Variant: returns a Future |
对应异常处理之前的部分和原来没有区别。当上一层执行未出异常时,首先执行本层的callback。如果本层callback出现异常,则进行与上一层出现异常一样的处理。未出异常时,获取到原本的下一层promise,将下一层的promise的core设置Proxy为返回的future的core。同时为了避免重复析构,将二者的core都设置为空值。
这里我们详细看一下setProxy执行逻辑:
1 | void CoreBase::setProxy_(CoreBase* proxy) { |
对应原本返回的future后面有设置的链式处理来说,执行的是proxyCallback处理函数,其逻辑如下:
1 | void CoreBase::proxyCallback(State priorState) { |
可以看到,其逻辑就是把当前core的所有信息都迁移到proxy_中,包括要执行的callback_。这样,对于原本需要等待原promise设置result才能执行的callback来说,变成了依赖proxy_对应的promise被设置result,这样就完成了运行时对原future依赖的替换(妙啊!!!)。
这时,其实原本的promise就没有作用了,使用的就是替换后的future对应的promise。
callback设置线程池
对于每个callback都可以设置对应的执行线程池,由于每个callback都有一个future持有,因此设置future使用的线程池即可。通过via方法设置,一般设置执行的线程池是在SemiFuture设置,较为简单:
1 | template <class T> |
等待执行结束
对应异步来说,最终用户拿到一个future,我们需要在合适的时候等待异步的结束,调用get方法获取执行结果:
1 | template <class T> |
可以看到,等待的核心逻辑在waitImpl中。其核心是增加一层调用链,使用条件变量来等待执行完成。
在waitImpl中,增加一个promise,baton可以理解为一个条件变量。新增的调用链设置的callback只有两个作用,将上一层的结果写到新的future里面,作为最终返回给用户的结构,执行baton.post()方法,让在等待条件变量的线程被唤醒。设置完成callback后,线程就调用等待条件变量成立的环节,一直扥到最后的callback被执行完成,唤醒该线程,从而返回结果。
异常处理
上面已经介绍了不少关于异常处理的内容,但是一直未介绍如果设置异常处理函数。首先关于c++异常,可以参考该博客C++异常。
设置异常处理使用thenError方法,其逻辑如下:
1 | template <class T> |
异常处理的返回值也区分try类型和future类型,一般future类似使用的较少,这里仅展示try类型。
可以看到thenError整体实现与thenTry,thenValue区别不大,同样是增加一层调用链,区别只在设置的callback上。
thenError一般有两个参数,第一个指示错误类型,当出错是进行类型匹配,只执行匹配到的那个,第二个则是设置的callback。
callback处理逻辑为,先判断上一层执行的结构是否是参数中的异常类型(当未出现异常或者出现异常类型不匹配,均返回false),如果匹配,则执行callback为下一层设置result,否则直接将结果传递到下一层,开始下一层的逻辑。结合异常传递更易理解。
DAG
上面介绍了folly的异步框架实现,下面我们来看如何基于异步框架来实现DAG。首先介绍两个异步框架使用的额外方法collect函数和SharedPromise类。
collect
collect方法参数是一个future的list,返回是一个SemiFuture,其result所有future的result构成的元组。其实现的功能是,新创建一个future,以参数中的所有future的结果作为其输入,即新建的future仅在参数中所有future获取到结果才执行自身的callback。这是DAG中重要的一环,即某个算子依赖多个算子,在多个算子执行完成时才能够执行,该功能即由collect来实现。
下面来看一下其具体实现:
1 | template <typename... Fs> |
通过看代码,我们可以看到,其实现就是利用shared_ptr的特性,即只在引用计数为0时,才执行析构函数。这样,我们让参数中所有folly均通过callback持有shared_ptr,让所有future执行callback结束时,就会执行析构函数,自动将shared_ptr的引用计数减一,直到所有的future均执行完成callback,就会最终析构shared_ptr,这时在析构函数中对新建的promise设置result,这样新建的promise就依赖所有参数中future执行结果,保证在所有future的callback执行·完成才执行。
其中Context就充当该shared_ptr的数据。对每个传递的future设置callback为将结果写到Context中对应的位置(或者抛出异常)。
在参数中所有future的callback执行完成后,就获取到了所有folly的结果,执行Context的析构函数,判断是否有异常产生,如果没有异常产生,则设置下一层的promise,完成计算。对应返回的future来说,如果后面还有别的链式执行逻辑,则会在这里被设置result后继续执行,如果没有,则用户直接获取到结果。
SharedPromise类
上面介绍的collect解决了一个算子依赖多个算子的情况,但还有另一个情况,就是多个算了依赖了同一个算子。这部分则是通过SharedPromise类来实现,其实现逻辑也相对简单,就是其会持有一个promise的list,当某个算子要依赖该算子时,就会将list增加一个promise,这样依赖其的算子就会获得一个future,当该算子执行完成后,会对该list中所有promise设置result,这样持有该算子future的·所有算子都可以继续执行了。
我们来看其具体实现,仅看核心数据和接口:
1 | template <class T> |
在使用中,每个算子持有一个SharedPromise,当某个算子依赖自身时,通过getSemiFuture()方法获取一个SemiFuture。当算子执行完成后,通过setTry向所有生成的promise设置结果,这样所有依赖该算子的算子,都可以开始执行其callback。
介绍完了上面的两个依赖,我们来看一下实际DAG的实现,folly实现了一个简单的DAG class FutureDAG。
FutureDAG
FutureDAG使用future&promise异步框架和上面介绍的两个工具实现了通用的DAG。
1 | class FutureDAG : public std::enable_shared_from_this<FutureDAG> { |
Node是一个算子,其持有要执行的函数,要执行该任务的线程池,一个SharedPromise,dependencies节点自身依赖的节点列表,hasDependents表示是否有节点依赖自身,visited应该是旧版本判断是否有环的,目前没有用。
FutureDAG使用逻辑是,创建一个空的FutureDAG实例,使用add向其中增加节点,同时返回节点对应下标,之后使用dependency构建节点间依赖关系,其中参数含义是:b依赖a。构建完成后调用go来执行DAG,在go中,首先判断节点间依赖是否成环,如果成环则不开执行,返回有异常的future,否则利用collect和SharedPromise构建执行的依赖关系。
hasCycle函数判断是否成环,其逻辑较为简单。首先计算每个节点被依赖的次数存到dependents中,将不被别的节点依赖的节点放到handles中,从handles中取出一个节点,将该节点依赖的depends清空,并将所有该节点依赖的节点的计数减一,如果减一后结果为0,表示不再有别的节点依赖这个节点了,这时将该节点加到handles中,之后一直从handles中取数据,直到handles为空,此时判断是否还有未清空的depends,如果有则表示有环。
对有无环的DAG执行来说,首先区分叶节点和根节点,根节点是不依赖别的节点结果可以立即执行的节点,叶节点是哪些没有节点依赖该节点的节点。对应叶节点,增加一个sink节点,让该节点依赖所有的叶节点,作为执行结束标识。对应根节点,增加一个source节点,让所有根节点都依赖该节点,作为DAG启动标识。
遍历每个节点,通过其依赖的节点的promise获取到一个futurelist,通过collect该list构建一个新的future,设置一个callback,则该collback中执行节点对应的函数,并设置节点自生SharedPromise的result。同时设置异常处理函数。
之后设置source节点的result开始DAG执行,返回sink节点新增的一个调用链,用于清理source和sink节点。
对应返回的future,用户调用std::move(f).get()方法等待执行完成即可。
参考
C++高级方法
std::enable_if
enable_if 的定义类似于下面的代码:(只有 Cond = true 时定义了 type)
1 | template<bool Cond, class T = void> struct enable_if {}; |
这样的话,enable_if<true, T>::type 即为 T,而 enable_if<false, T>::type 会引发编译错误(在 SFINAE 下,即不将包含这一 enable_if 的函数 / 类作为候选)。
类模板的偏特化
https://sg-first.gitbooks.io/cpp-template-tutorial/content/jie_te_hua_yu_pian_te_hua.html
std::decay_t
去除变量的所有引用属性,获取其原始class