gparallel
是一个针对具有复杂流程或逻辑的单体式信息检索系统而设计的并行任务调度框架。使用Meta Programming
技术根据任务的输入和输出自动推导依赖关系,生成DAG(Directed acyclic graph)
并进行并行任务调度。
- 编译依赖
- g++8
- boost_log-mt v1.70
- gtest v1.10.0
$ git clone [email protected]:galois-advertising/gparallel.git
$ cd gparallel
$ git submodule update --init --recursive
$ mkdir build
$ cd build
$ cmake ..
$ make
$ ./test
$ ./demo
首先将gparallel
以及所依赖的common
和gtest
设置为git submodule
。
cd your-project
git add submodule -b master https://github.com/galois-advertising/gparallel
git add submodule -b master https://github.com/galois-advertising/common
git add submodule https://github.com/google/googletest.git gtest
cd gtest
git checkout release-1.10.0
git add gtest
git commit -m "Add gparallel"
并且修改CMakeLists.txt
,加入:
INCLUDE_DIRECTORIES("${CMAKE_SOURCE_DIR}/common/util/include")
INCLUDE_DIRECTORIES("${CMAKE_SOURCE_DIR}/gparallel/include")
IF (NOT TARGET gtest)
ADD_SUBDIRECTORY(gtest)
ENABLE_TESTING()
INCLUDE_DIRECTORIES(SYSTEM
${gtest_SOURCE_DIR}
${gtest_SOURCE_DIR}/include)
ENDIF()
IF (NOT TARGET common)
ADD_SUBDIRECTORY(common)
ENDIF()
IF (NOT TARGET gparallel)
ADD_SUBDIRECTORY(gparallel)
ENDIF()
ADD_EXECUTABLE(your-bin ...)
TARGET_LINK_LIBRARIES(your-bin common gparallel)
对于单体型业务系统,在系统建立初期,系统业务还比较简单,每次检索请求
到来时需要执行的业务逻辑也比较单一,此时请求级别的数据变量
比较少,这些变量的赋值顺序与依赖关系也一目了然,系统很容易维护。
但是,随着开发的人越来越多,大家都在上面加入自己的业务逻辑
和新的数据变量
,此时变量增加到了几百个,变量之间的赋值顺序与依赖关系开始变得复杂,一些代码逻辑甚至打破了已有的流程结构(俗称飞线),整个系统难以理解。没有一个人能说清楚这些变量之间的依赖关系以及这些业务逻辑之间的执行顺序,每次新的开发都如履薄冰,每次排查问题都耗时耗力。
对于多人参与,迭代密集的系统,如何更合理地组织各类型的数据,如何更好地实现繁杂的业务逻辑,这就是为需要gparallel
的理由.
gparallel
是一款基于DAG(Directed acyclic graph)
的并且支持自动依赖推导的任务调度框架。
DAG
在计算机领域有着广泛的应用,例如在大数据计算中可以使用DAG指导Hadoop任务的执行顺序等等。在软件设计中也被广泛应用,开源社区中DAG-based调度框架也有很多,例如cpp-taskflow。但是,几乎所有框架都采用了 手工配置的方式 生成调度DAG。例如在一个拥有四个任务的调度系统中,cpp-taskflow
需要通过下面方式来配置DAG。
auto [A, B, C, D] = taskflow.emplace(
[] () { std::cout << "TaskA\n"; },
[] () { std::cout << "TaskB\n"; },
[] () { std::cout << "TaskC\n"; },
[] () { std::cout << "TaskD\n"; }
);
A.precede(B); // A runs before B
A.precede(C); // A runs before C
B.precede(D); // B runs before D
C.precede(D); // C runs before D
}
上面代码中想要生成预期的DAG图需要人工显式定义每两个节点之间的依赖关系,这种方式虽然理解比较直观,但是缺点也非常明显:
- 在有大量任务的时候,人工定义DAG图比较困难并且容易出错。现实中的业务系统一般是多人同时开发,这就需要模块owner对所有的任务节点之间的依赖关系进行人工梳理,可维护性较差。
- 工业环境中很多业务,往往以数据流驱动的方式表达会更加清晰,这就需要花费大量时间来将系统逻辑从数据驱动表示强行转化为任务驱动表示来适配调度系统,耗时耗力。
gparallel的主要思想有3个:
数据划分
:将所有数据成员,按照业务逻辑和数据状态划分为不同的集合。依赖推导
:将所有的代码逻辑,按照功能划分为不同的task节点,并且自动推导task节点之间的依赖关系,建立DAG。任务调用
:通过拓扑排序,将DAG
转化为偏序
表示,并使用thread或者coroutine对task进行调度。
在检索系统中,数据
一般就是指检索过程中用来存储中间结果和最终结果的变量,比如存储广告的std::list
,存储标题的std::string
。gparallel主要从2个角度进行划分:
-
按照业务逻辑
:把不同业务逻辑所需要用到的数据划分为不同的集合。比如有不同的广告营销产品,各自都有自己的User、Plan和Ad的集合,以及一些存储数中间结果的变量。为了方便描述,我们用大写字母来表示按照业务逻辑划分出来的数据即可,例如A
、B
、C
等等。 -
按照数据状态
:更进一步,对于服务于同一个业务的数据集合(A),在不同的阶段,又可以划分为不同的状态。例如一个广告队列,初始状态是空(empty),经过填充以后有N条广告(inited),又经过了一次按照CTR的排序(ranked),最后经历了一次截断(truncate),那么对应四个状态A_empty
,A_inited
,A_ranked
,A_truncated
。
为了方便理解,我们把划分出来的每个子集(包含不同的阶段),叫做meta
。前面提到的A_empty
,A_inited
,A_ranked
,A_truncate
都是meta
。在gparallel中,我们用DECL_META
宏来定义一个meta
。
meta:用来描述指定业务所需要的所有数据(指定节点)的集合的一种数据结构。
理解gparallel对数据的2层递进划分方式非常重要,因为gparallel的DAG自动推导过程正是依赖于不同的meta。
现实中的系统中,数据成员一般放置在一个叫做context
或者thread_data
的结构体中。顾名思义,这些数据的作用范围就是一次请求,一个比较常见的设计是一次请求由线程池中的一个线程来独立负责,所以请求级别的数据,往往也是线程级别的数据。这个context
或者thread_data
类型,在代码实现中通常用meta_storage_t
来表示,即所一次检索中用到的所有数据,都统一存储在这里。
通过定义getter
和setter
可以对子集需要包含的元素进行指定,如果定义了getter
和setter
就代表这个meta
中包含这个数据成员。
子集之间也可以互相包含,原理与面向对象中的继承
是一样的。同理,如果一个任务依赖于一个meta
,则也同样依赖于这个meta
的父meta
,gparallel
在推导依赖时会自动将继承关系也考虑在内。继承
机制的主要目的是为了避免重复定义集合的元素,增加代码的可维护性。在实践中,通常将很多业务公用的数据单独定义为一个meta
,属于单独业务的meta
,可以从前面的公共meta
进行继承。
通过下面的例子可以理解meta_storage_t
、meta
和继承
的关系。
通过上图可以看到,所有数据被放置在thread_data
这个对象中(meta_storage_t
),通过三个meta
将thread_data
中的数据划分为三个集合:
meta_common
包含了thread_data::id
meta_a
包含了thread_data::business_a
并且继承了meta_common
的所有元素。meta_b
包含了thread_data::business_b
并且继承了meta_common
的所有元素。
在gparallel
中,使用一个函数表示一个具体的任务,函数的参数表示任务的输入
和输出
。任何一个meta
既可以作为输入
,也可以作为输出
。这里引入2个模版包装器input
和output
。如果meta
用input
包装,则任务函数会将其当作一个输入数据,同理如果用output
包装,则会当作输出。
表示任务的函数,必须定义为类的静态成员函数,函数名字必须为process
,返回类型为void
。例如:
struct DemoNode {
void process(input<meta_a> a, input<meta_b> b, output<meta_c> c) {
// process code
c->mutable_business_c() = a->get_business_a() + b->get_business_b();
}
}
上面的DemoNode
实现了将business_a
和business_b
的和赋值给business_c
的逻辑。
process
函数可以拥有任意多个输入
和任意多个输出
。其中没有输入
的节点作为起始节点之一,没有输出
的节点作为终止节点之一。
通过register_node
函数将所有节点注册到DAG
中后,使用setup_dag_schema
函数可自动实现节点依赖分析以及DAG
生成。针对生成的DAG
进行拓扑排序
,就能得到任务调度顺序。后面可根据具体情况,实现多线程或者多协程调度。
dag_schema<thread_data> nodes;
register_node<thread_data, node_a, node_b, node_c ...>::reg(nodes);
setup_dag_schema<thread_data>(nodes);
if (auto tasks = topological_sort<thread_data>(nodes); tasks) {
for (auto task : tasks.value()) {
task->mutable_executor()(&td);
}
}
在本小结中,会从一个现实的场景来描述gparallel
的使用逻辑。
问题描述:对一个指定的广告队列,分别请求其
CTR
(点击率)值和CPM
(千次展示成本)并填充到广告的对应字段,最后分别按照CTR
和CPM
进行排序后生成2个新的广告队列供下游使用。
上述流程是广告检索系统里面一个比较典型的逻辑,完整的代码在./demo/advprocess.cpp。
我们首先梳理一下所需要用到的数据对象:
数据名称 | 类型 | 含义 |
---|---|---|
advs_original | advlist_t | 原始的广告队列 |
ctr_data | advlist_t | 模型返回的ctr数据 |
cpm_data | advlist_t | 模型返回的cpm数据 |
advs_ctr_ordered | ctr_response_t | 输出的ctr排序的广告队列 |
advs_cpm_ordered | cpm_response_t | 输出的cpm排序的广告队列 |
根据上面的定义,我们定义数据集合,业务执行需要用到的所有数据都放在thread_data
这个集合中:
class thread_data {
public:
advlist_t advs_original;
advlist_t advs_ctr_ordered;
advlist_t advs_cpm_ordered;
ctr_response_t ctr_data;
cpm_response_t cpm_data;
};
接下来我们根据不同的业务,将集合划分为不同的子集,每个子集就是一个meta
,一个元素可以同时属于多个meta
,meta
与meta
之间可以互相包含。
根据问题的描述,我们可以很容易总结出5个子流程,每个子流程都对应一个数据处理节点:
流程 | 节点名称 | 输入 | 输出 |
---|---|---|---|
获取CTR值 | get_ctr_node | advs_original | ctr_data |
获取CPM值 | get_cpm_node | advs_original | cpm_data |
填充字段 | fill_node | advs_original ctr_data cpm_data |
advs_original* |
生成CTR 排序队列 | gen_ctr_node | advs_original* | advs_ctr_ordered |
生成CPM 排序队列 | gen_cpm_node | advs_original* | advs_cpm_ordered |
可以看到原始输入的广告队列是advs_original
,这里我们将其封装为metaoriginal
。
get_ctr_node
节点和get_cpm_node
节点通过metaoriginal
分别获取ctr_data
和cpm_data
两份数据,这两份数据我们用metactr
和metacpm
来封装。
fill_node
节点对广告队列进行数据填充,这里注意,节点的输入中有metaoriginal
,输出中有metaoriginal_with_ctr_cpm
。这2个meta其实本质上都是advs_original
的封装,但是因为属于2个阶段(即填充前和填充后),所以分别用2个不同的meta来表示,在实现上,我们可以直接使用继承功能,复用metaoriginal
。
gen_ctr_node
和gen_cpm_node
的输入都包含metaoriginal_with_ctr_cpm
,表示其依赖于填充后的advs_original
而不是填充前。
这样就可以根据不同的业务逻辑,把thread_data
集合划分为不同的meta
,每个业务只需要关注子集需要用到哪些meta
,生成哪些meta
即可。
首先实现get_ctr_node
和get_cpm_node
这两个任务节点。
struct get_ctr_node {
static void process(input<original> ori, output<ctr> ctr) {
INFO("[gparallel] get_ctr_node", "");
ctr->mutable_ctr_data().resize(ori->mutable_advs_original().size());
for (int pos = 0; pos < ori->mutable_advs_original().size(); pos++) {
auto & adv = ori->mutable_advs_original()[pos];
ctr->mutable_ctr_data()[pos] = 0.1 * static_cast<double>(adv.id);
}
}
};
struct get_cpm_node {
static void process(input<original> ori, output<cpm> cpm) {
INFO("[gparallel] get_cpm_node", "");
cpm->mutable_cpm_data().resize(ori->mutable_advs_original().size());
for (int pos = 0; pos < ori->mutable_advs_original().size(); pos++) {
auto & adv = ori->mutable_advs_original()[pos];
cpm->mutable_cpm_data()[pos] = 100.2 * static_cast<double>(adv.id);
}
}
};
这两个任务的功能相似,都是根据original
中存储的所有广告,获取对应的点击率
和千次展示成本
数据,并且分别保存到ctr
和cpm
这两个meta。这里为了简化,我们取一些随机的数字作为ctr
作为cpm
。现实中往往需要同步或者异步请求模型服务器来获取。
接下来实现fill_node
。
struct fill_node {
static void process(input<ctr> ctr, input<cpm> cpm, input<original> ori,
output<original_with_ctr_cpm> ori_ctr_cpm) {
INFO("[gparallel] fill_node", "");
for (int pos = 0; pos < ori->mutable_advs_original().size(); pos++) {
auto & adv = ori_ctr_cpm->mutable_advs_original()[pos];
adv.ctr = ctr->mutable_ctr_data()[pos];
adv.cpm = cpm->mutable_cpm_data()[pos];
INFO("[gparallel] ori_ctr_cpm adv:[%d] ctr:[%lf] cpm:[%lf]", adv.id, adv.ctr, adv.cpm);
}
}
};
fill_node
节点将前面get_ctr_node
和get_cpm_node
这两个节点的输出作为自己的输入,最后生成original_with_ctr_cpm
。这里可以看到,original
和original_with_ctr_cpm
分别作为节点的输入和输出,虽然这两个meta中包含的数据都是advs_original
,但是对应了不同的状态。original
代表了advs_original
一开始的状态,而original_with_ctr_cpm
代表了已经填充了ctr
作为cpm
数据的状态。
然后实现gen_ctr_node
和gen_cpm_node
。
struct gen_ctr_node {
static void process(input<original_with_ctr_cpm> ori_ctr_cpm,
output<ctr_ordered_advlist> ctr_ordered) {
INFO("[gparallel] gen_ctr_node", "");
ctr_ordered->mutable_advs_ctr_ordered() = ori_ctr_cpm->mutable_advs_original();
std::sort(ctr_ordered->mutable_advs_ctr_ordered().begin(),
ctr_ordered->mutable_advs_ctr_ordered().end(),
[](const auto& a, const auto& b)->bool{return a.ctr > b.ctr;});
}
};
struct gen_cpm_node {
static void process(input<original_with_ctr_cpm> ori_ctr_cpm,
output<cpm_ordered_advlist> cpm_ordered) {
INFO("[gparallel] gen_cpm_node", "");
cpm_ordered->mutable_advs_cpm_ordered() = ori_ctr_cpm->mutable_advs_original();
std::sort(cpm_ordered->mutable_advs_cpm_ordered().begin(),
cpm_ordered->mutable_advs_cpm_ordered().end(),
[](const auto& a, const auto& b)->bool{return a.cpm > b.cpm;});
}
};
这里需要注意的是,gen_ctr_node
和gen_cpm_node
这两个节点,需要的是已经填充了ctr
作为cpm
数据的advs_original
,所以输入meta必须为original_with_ctr_cpm
。这一点对于生成正确的DAG来说非常重要。gparallel
正是通过区分同一数据的不同阶段,来实现正确的任务调度。
最后,end_node
对所有的结果进行汇总,并反馈给下游。
struct end_node {
static void process(input<ctr_ordered_advlist> ctr, input<cpm_ordered_advlist> cpm) {
INFO("[gparallel] end_node", "");
for (auto& adv : cpm->mutable_advs_cpm_ordered()) {
INFO("CPM ordered:[%d]", adv.id);
}
for (auto& adv : ctr->mutable_advs_ctr_ordered()) {
INFO("CTR ordered:[%d]", adv.id);
}
}
};
在所有的meta和node都定义好以后,我们可以通过下面流程进行任务调度。
首先我们需要申请相应的内存来存储我们所有的数据变量,这里我们申请一个thread_data
的实例:
thread_data td{{advertisement(1), advertisement(2), advertisement(3)}, {},{},{},{}};
我们还需要一个节点容器
,来存放所有的数据处理节点:
dag_schema<thread_data> nodes;
接下来我们将所有的节点
,注册到刚才申请的节点容器
中。
register_node<thread_data, get_ctr_node, get_cpm_node>::reg(nodes);
register_node<thread_data, fill_node>::reg(nodes);
register_node<thread_data, gen_ctr_node, gen_cpm_node, end_node>::reg(nodes);
register_node
模版负责注册节点到容器nodes
中,其中第一个模版参数是我们所有数据最终存储的类,也就是meta_storage_t
。剩余任意多个模版参数分别为各个node。reg
函数的输入参数为dag_schema
类型的节点容器实例。
接下来推导所有节点的依赖关系:
setup_dag_schema<thread_data>(nodes);
最后,我们对DAG上面的所有节点进行拓扑排序
,并且按照排序后的顺序依次进行调用:
if (auto tasks = topological_sort<thread_data>(nodes); tasks) {
for (auto task : tasks.value()) {
INFO("Execute[%s]", task->name().c_str());
task->mutable_executor()(&td);
}
}
编译执行:
$ cd build
$ cmake ../
$ make demo
$ ./demo
如果我们打开了debug日志,就可以在demo的输出中看到下面的DAG信息:
[2020-05-20 23:24:11.082883] [] [info][~/gparallel/include/dag_schema.h][127]node_depends_after
http://graphviz.it/#
[2020-05-20 23:24:11.083212] [] [info][~/gparallel/include/dag_schema.h][128]
digraph node_depends_after{
rankdir=BT;
size="8,5";
"fill_node" -> "get_ctr_node";
"fill_node" -> "get_cpm_node";
"gen_ctr_node" -> "fill_node";
"gen_cpm_node" -> "fill_node";
"end_node" -> "gen_ctr_node";
"end_node" -> "gen_cpm_node";
}
通过http://graphviz.it/#,可以看到gparallel
自动推导得到的DAG。
同时可以看到全部业务执行的结果:
[2020-05-20 23:24:11.084784] [] [info][advprocess.cpp][141]Execute[get_ctr_node]
[2020-05-20 23:24:11.084792] [] [info][advprocess.cpp][63][gparallel] get_ctr_node
[2020-05-20 23:24:11.084866] [] [info][advprocess.cpp][141]Execute[get_cpm_node]
[2020-05-20 23:24:11.084898] [] [info][advprocess.cpp][74][gparallel] get_cpm_node
[2020-05-20 23:24:11.084905] [] [info][advprocess.cpp][141]Execute[fill_node]
[2020-05-20 23:24:11.084910] [] [info][advprocess.cpp][87][gparallel] fill_node
[2020-05-20 23:24:11.084916] [] [info][advprocess.cpp][92][gparallel] ori_ctr_cpm adv:[1] ctr:[0.100000] cpm:[100.200000]
[2020-05-20 23:24:11.084920] [] [info][advprocess.cpp][92][gparallel] ori_ctr_cpm adv:[2] ctr:[0.200000] cpm:[200.400000]
[2020-05-20 23:24:11.084956] [] [info][advprocess.cpp][92][gparallel] ori_ctr_cpm adv:[3] ctr:[0.300000] cpm:[300.600000]
[2020-05-20 23:24:11.084966] [] [info][advprocess.cpp][141]Execute[gen_ctr_node]
[2020-05-20 23:24:11.084992] [] [info][advprocess.cpp][100][gparallel] gen_ctr_node
[2020-05-20 23:24:11.085003] [] [info][advprocess.cpp][141]Execute[gen_cpm_node]
[2020-05-20 23:24:11.085007] [] [info][advprocess.cpp][112][gparallel] gen_cpm_node
[2020-05-20 23:24:11.085011] [] [info][advprocess.cpp][141]Execute[end_node]
[2020-05-20 23:24:11.085015] [] [info][advprocess.cpp][122][gparallel] end_node
[2020-05-20 23:24:11.085046] [] [info][advprocess.cpp][124]CPM ordered:[3]
[2020-05-20 23:24:11.085055] [] [info][advprocess.cpp][124]CPM ordered:[2]
[2020-05-20 23:24:11.085062] [] [info][advprocess.cpp][124]CPM ordered:[1]
[2020-05-20 23:24:11.085069] [] [info][advprocess.cpp][127]CTR ordered:[3]
[2020-05-20 23:24:11.085076] [] [info][advprocess.cpp][127]CTR ordered:[2]
[2020-05-20 23:24:11.085082] [] [info][advprocess.cpp][127]CTR ordered:[1]
Some friends ask how to execute this DAG parallelly?
One solution is:
void invoke(Node &node, std::shared_ptr<std::latch> done) {
std::shared_ptr<std::latch> dep_done = std::make_shared<>(node.dependency_list.size());
for (auto &dep : node.dependency_list) {
invoke(dep, dep_done);
}
thread_pool.push([&node, done, dep_done](){
dep_done->wait();
node.execute();
done->count_down();
})
}
int main() {
std::list<Node> node_list;
// Find the final node
Node &final_node = xxx;
std::shared_ptr<std::latch> done = std::make_shared<>(1);
invoke(final_node, latch);
latch->wait();
// Final result is ready here
}
By huyifeng (2023/08/26)