6.6 Pipeline 执行引擎¶
Tip
Pipeline 执行引擎是 Doris 在 2.0 版本加入的实验性功能,随后在 2.1 版本进行了优化与升级(即 PipelineX )。在 3.0 以及之后的版本中, Doris 只使用 PipelineX 作为唯一执行引擎,并且更名为 Pipeline 执行引擎。
Pipeline 执行引擎的主要目标是为了替换之前 Doris 基于火山模型的执行引擎,充分释放多核 CPU 的计算能力,并对 Doris 的查询线程的数目进行限制,解决 Doris 的执行线程膨胀的问题。
它的具体设计、实现和效果可以参阅DSIP-027以及DSIP-035。
1 原理¶
当前的 Doris 的 SQL 执行引擎是基于传统的火山模型进行设计,在单机多核的场景下存在下面的一些问题:
-
无法充分利用多核计算能力,提升查询性能,多数场景下进行性能调优时需要手动设置并行度,在生产环境中几乎很难进行设定。
-
单机查询的每个
Instance对应线程池的一个线程,这会带来额外的两个问题。-
线程池一旦打满。
Doris的查询引擎会进入假性死锁,对后续的查询无法响应。同时有一定概率进入逻辑死锁的情况:比如所有的线程都在执行一个Instance的Probe任务。 -
阻塞的算子会占用线程资源,而阻塞的线程资源无法让渡给能够调度的
Instance,整体资源利用率上不去。
-
-
阻塞算子依赖操作系统的线程调度机制,线程切换开销较大(尤其在系统混布的场景中)
由此带来的一系列问题驱动 Doris 需要实现适应现代多核 CPU 的体系结构的执行引擎。
而如下图所示(引用自Push versus pull-based loop fusion in query engines), Pipeline 执行引擎基于多核 CPU 的特点,重新设计由数据驱动的执行引擎:

-
将传统
Pull拉取的逻辑驱动的执行流程改造为Push模型的数据驱动的执行引擎 -
阻塞操作异步化,减少了线程切换,线程阻塞导致的执行开销,对于
CPU的利用更为高效 -
控制了执行线程的数目,通过时间片的切换的控制,在混合负载的场景中,减少大查询对于小查询的资源挤占问题
-
执行并发上,依赖
Local Exchange使Pipeline充分并发,可以让数据被均匀分布到不同的Task中,尽可能减少数据倾斜,此外,Pipeline也将不再受存储层Tablet数量的制约。 -
执行逻辑上,多个
Pipeline Task共享同一个Pipeline的全部共享状态,例如表达式和一些Const变量,消除了额外的初始化开销。 -
调度逻辑上,所有
Pipeline Task的阻塞条件都使用Dependency进行了封装,通过外部事件(例如RPC完成)触发task的执行逻辑进入Runnable队列,从而消除了阻塞轮询线程的开销。 -
Profile:为用户提供简单易懂的指标。
从而提高了 CPU 在混合负载 SQL 上执行时的效率,提升了 SQL 查询的性能。
2 使用方式¶
2.1 查询¶
-
enable_pipeline_engine将
Session变量enable_pipeline_engine设置为true,则BE在进行查询执行时将会使用Pipeline执行引擎。SQL 1set enable_pipeline_engine = true; -
parallel_pipeline_task_numparallel_pipeline_task_num代表了SQL查询进行查询并发的Pipeline Task数目。Doris默认的配置为0,此时Pipeline Task数目将自动设置为当前集群机器中最少的CPU数量的一半。用户也可以根据自己的实际情况进行调整。SQL 1set parallel_pipeline_task_num = 0;可以通过设置
max_instance_num来限制自动设置的并发数(默认为64) -
enable_local_shuffle设置
enable_local_shuffle为True则打开Local Shuffle优化。Local Shuffle将尽可能将数据均匀分布给不同的Pipeline Task从而尽可能避免数据倾斜。SQL 1set enable_local_shuffle = true; -
ignore_storage_data_distribution设置
ignore_storage_data_distribution为True则表示忽略存储层的数据分布。结合Local Shuffle一起使用,则Pipeline引擎的并发能力将不再受到存储层Tablet数量的制约,从而充分利用机器资源。SQL 1set ignore_storage_data_distribution = true;
2.2 导入¶
导入的引擎选择设置,详见导入文档。