What makes a contemporary database system? The three key modules are question optimizer, execution engine, and storage engine. Amongst them, the function of execution engine to the DBMS is just like the chef to a restaurant. This text focuses on the execution engine of the Apache Doris knowledge warehouse, explaining the key to its excessive efficiency.
For example the function of the execution engine, let’s observe the execution strategy of an SQL assertion:
- Upon receiving an SQL question, the question optimizer performs syntax/lexical evaluation and generates the optimum execution plan based mostly on the associated fee mannequin and optimization guidelines.
- The execution engine then schedules the plan to the nodes, which function on knowledge within the underlying storage engine after which return the question outcomes.
The execution engine performs operations like knowledge studying, filtering, sorting, and aggregation. The effectivity of those steps determines question efficiency and useful resource utilization. That is why totally different execution fashions deliver distinction in question effectivity.
Volcano Mannequin
The Volcano Mannequin (initially generally known as the Iterator Mannequin) predominates in analytical databases, adopted by the Materialization Mannequin and Vectorized Mannequin. In a Volcano Mannequin, every operation is abstracted as an operator, so your entire SQL question is an operator tree. Throughout question execution, the tree is traversed top-down by calling the subsequent()
interface, and knowledge is pulled and processed from the underside up. That is referred to as a pull-based execution mannequin.
The Volcano Mannequin is versatile, scalable, and straightforward to implement and optimize. It underpins Apache Doris earlier than model 2.1.0. When a consumer initiates an SQL question, Doris parses the question, generates a distributed execution plan, and dispatches duties to the nodes for execution. Every particular person job is an occasion. Take a easy question for instance:
choose age, intercourse from staff the place age > 30
In an occasion, knowledge flows between operators are propelled by the subsequent()
technique. If the subsequent()
technique of an operator known as, it’ll first name the subsequent()
of its little one operator, acquire knowledge from it, after which course of the info to supply output.
subsequent()
is a synchronous technique. In different phrases, the present operator shall be blocked if its little one operator doesn’t present knowledge for it. On this case, the subsequent()
technique of the basis operator must be referred to as in a loop till all knowledge is processed, which is when the occasion finishes its computation.
Such execution mechanism faces a number of bottlenecks in single-node, multi-core use circumstances:
- Thread blocking: In a fixed-size thread pool if an occasion occupies a thread and it’s blocked, that can simply trigger a impasse when there are a lot of cases requesting execution concurrently. That is particularly the case when the present occasion relies on different cases. Moreover, if a node is operating extra cases than the variety of CPU cores it has, the system scheduling mechanism shall be closely relied upon and an enormous context-switching overhead will be produced. In a colocation situation, this may result in a good bigger thread-switching overhead.
- CPU competition: The threads would possibly compete for CPU assets so queries of various sizes and between totally different tenants would possibly intrude with one another.
- Underutilization of the multi-core computing capabilities: Execution concurrency depends closely on knowledge distribution. Particularly, the variety of cases operating on a node is restricted by the variety of knowledge buckets on that node. On this case, it is necessary to set an applicable variety of buckets. In case you shard the info into too many buckets, that can develop into a burden for the system and convey pointless overheads; if the buckets are too few, you will be unable to make the most of your CPU cores to the fullest. Nevertheless, in a manufacturing surroundings, it’s not at all times straightforward to estimate the right variety of buckets you want, thus efficiency loss.
Pipeline Execution Engine
Based mostly on the recognized problems with the Volcano Mannequin, we have changed it with the Pipeline Execution Engine since Apache Doris 2.0.0.
Because the title suggests, the Pipeline Execution Engine breaks down the execution plan into pipeline duties and schedules these pipeline duties right into a thread pool in a time-sharing method. If a pipeline job is blocked, will probably be placed on maintain to launch the thread it’s occupying. In the meantime, it helps varied scheduling methods, that means you could allocate CPU assets to totally different queries and tenants extra flexibly.
Moreover, the Pipeline Execution Engine swimming pools collectively knowledge inside knowledge buckets, so the variety of operating cases is now not capped by the variety of buckets. This not solely enhances Apache Doris’ utilization of multi-core methods but in addition improves system efficiency and stability by avoiding frequent thread creation and deletion.
Instance
That is the execution plan of a be a part of question. It contains two cases:
As illustrated, the Probe operation can solely be executed after the hash desk is constructed, whereas the Construct operation is reliant on the computation outcomes of the Alternate operator. Every of the 2 cases is split into two pipeline duties as such. Then these duties shall be scheduled within the “ready” queue of the thread pool. Following the desired methods, the threads acquire the duties to course of. In a pipeline job, after one knowledge block is completed, if the related knowledge is prepared and its runtime stays throughout the most allowed period, the thread will proceed to compute the following knowledge block.
Design and Implementation
Keep away from Thread Blocking
As talked about earlier, the Volcano Mannequin is confronted with a number of bottlenecks:
- If too many threads are blocked, the thread pool shall be saturated and unable to answer subsequent queries.
- Thread scheduling is fully managed by the working system, with none user-level management or customization.
How does the Pipeline Execution Engine keep away from such points?
- We repair the scale of the thread pool to match the CPU core depend. Then we cut up all operators which might be vulnerable to blocking into pipeline duties. For instance, we use particular person threads for disk I/O operations and RPC operations.
- We design a user-space polling scheduler. It repeatedly checks the state of all executable pipeline duties and assigns executable duties to threads. With this in place, the working system does not must regularly swap threads, thus much less overheads. It additionally permits personalized scheduling methods, resembling assigning priorities to duties.
Parallelization
Earlier than model 2.0, Apache Doris requires customers to set a concurrency parameter for the execution engine (parallel_fragment_exec_instance_num
), which doesn’t dynamically change based mostly on the workloads. Due to this fact, it’s a burden for customers to determine an applicable concurrency stage that results in optimum efficiency.
What is the trade’s answer to this?
Presto’s thought is to shuffle the info into an affordable variety of partitions throughout execution, which requires minimal concurrency management from customers. Alternatively, DuckDB introduces an additional synchronization mechanism as a substitute of shuffling. We resolve to observe Presto’s monitor of Presto as a result of the DuckDB answer inevitably entails using locks, which works towards our objective of avoiding blocking.
Not like Presto, Apache Doris does not want an additional Native Alternate mechanism to shards the info into an applicable variety of partitions. With its massively parallel processing (MPP) structure, Doris already does so throughout shuffling. (In Presto’s case, it re-partitions the info by way of Native Alternate for increased execution concurrency. For instance, in hash aggregation, Doris additional shards the info based mostly on the aggregation key in an effort to totally make the most of the CPU cores. Additionally, this may downsize the hash desk that every execution thread has to construct.)
Based mostly on the MPP structure, we solely want two enhancements earlier than we obtain what we wish in Doris:
- Enhance the concurrency stage throughout shuffling. For this, we solely must have the frontend (FE) understand the backend (BE) surroundings after which set an affordable variety of partitions.
- Implement concurrent execution after knowledge studying by the scan layer. To do that, we’d like a logical restructuring of the scan layer to decouple the threads from the variety of knowledge tablets. This can be a pooling course of. We pool the info learn by scanner threads, so it may be fetched by a number of pipeline duties instantly.
PipelineX
Launched in Apache Doris 2.0.0, the pipeline execution engine has been enhancing question efficiency and stability below hybrid workload eventualities (queries of various sizes and from totally different tenants). In model 2.1.0, we have tackled the recognized points and upgraded this from an experimental characteristic to a sturdy and dependable answer, which is what we name PipelineX.
PipelineX has offered solutions to the next points that used to problem the Pipeline Execution Engine:
- Restricted execution concurrency
- Excessive execution overhead
- Excessive scheduling overhead
- Poor readability of operator profile
Execution Concurrency
The Pipeline Execution Engine stays below the restriction of the static concurrency parameter at FE and the pill depend on the storage layer, making itself unable to capitalize on the total computing assets. Plus, it’s simply affected by knowledge skew.
For instance, suppose that Desk A accommodates 100 million rows but it surely has just one pill, which implies it’s not sharded sufficient, let’s examine what can occur whenever you carry out an aggregation question on it:
SELECT COUNT(*) FROM A GROUP BY A.COL_1;
Throughout question execution, the question plan is split into two fragments. Every fragment, consisting of a number of operators, is dispatched by frontend (FE) to backend (BE). The BE begins threads to execute the fragments concurrently.
Now, let’s deal with Fragment 0 for additional elaboration. As a result of there is just one pill, Fragment 0 can solely be executed by one thread. Meaning aggregation of 100 million rows by one single thread. If in case you have 16 CPU cores, ideally, the system can allocate 8 threads to execute Fragment 0. On this case, there’s a concurrency disparity of 8 to 1. That is how the variety of tablets restricts execution concurrency and likewise why we introduce the concept of Native Shuffle mechanism to take away that restriction in Apache Doris 2.1.0. So that is the way it works in PipelineX:
- The threads execute their very own pipeline duties, however the pipeline duties solely preserve their runtime state (generally known as Native State), whereas the knowledge that shared throughout all pipeline duties (generally known as International State) is managed by one pipeline object.
- On a single BE, the Native Shuffle mechanism is accountable for knowledge distribution and knowledge balancing throughout pipeline duties.
Aside from decoupling execution concurrency from pill depend, Native Shuffle can keep away from efficiency loss on account of knowledge skew. Once more, we’ll clarify with the foregoing instance.
This time, we shard Desk A into two tablets as a substitute of 1, however the knowledge just isn’t evenly distributed. Pill 1 and Pill 3 maintain 10 million and 90 million rows, respectively. The Pipeline Execution Engine and PipelineX Execution Engine reply in another way to such knowledge skew:
- Pipeline Execution Engine: Thread 1 and Thread 2 executes Fragment 1 concurrently. The latter takes 9 occasions so long as Thread 1 due to the totally different knowledge sizes they take care of.
- PipelineX Execution Engine: With Native Shuffle, knowledge is distributed evenly to the 2 threads, so that they take nearly equal time to complete.
Execution Overhead
Beneath the Pipeline Execution Engine, as a result of the expressions of various cases are particular person, every occasion is initialized individually. Nevertheless, for the reason that initialization parameters of cases share lots in frequent, we will reuse the shared states to cut back execution overheads. That is what PipelineX does: it initializes the International State at a time, and the Native State sequentially.
Scheduling Overhead
Within the Pipeline Execution Engine, the blocked duties are put right into a blocked queue, the place a devoted thread takes polls and strikes the executable duties over to the runnable queue. This devoted scheduling thread consumes a CPU core and incurs overheads that may be significantly noticeable on methods with restricted computing assets.
As a greater answer, PipelineX encapsulates the blocking situations as dependencies, and the duty standing (blocked or runnable) shall be triggered to alter by occasion notifications. Particularly, when RPC knowledge arrives, the related job shall be thought of as prepared by the ExchangeSourceOperator after which moved to the runnable queue.
Meaning PipelineX implements event-driven scheduling. A question execution plan will be depicted as a DAG, the place the pipeline duties are abstracted as nodes and the dependencies as edges. Whether or not a pipeline job will get executed relies on whether or not all its related dependencies have happy the requisite situations.
For simplicity of illustration, the above DAG solely exhibits the dependencies between the upstream and downstream pipeline duties. Actually, all blocking situations are abstracted as dependencies. The entire execution workflow of a pipeline job is as follows:
In event-driven execution, a pipeline job will solely be executed in any case its dependencies fulfill the situations; in any other case, will probably be added to the blocked queue. When an exterior occasion arrives, all blocked duties shall be re-evaluated to see in the event that they’re runnable.
The event-driven design of PipelineX eliminates the necessity for a polling thread and thus the consequential efficiency loss below excessive cluster hundreds. Furthermore, the encapsulation of dependencies allows a extra versatile scheduling framework, making it simpler to spill knowledge to disks.
Operator Profile
PipelineX has reorganized the metrics within the operator profiles, including new ones and obsoleting irrelevant ones. Apart from, with the dependencies encapsulated, we monitor how lengthy the dependencies take to prepare by the metric WaitForDependency
, so the profile can present a transparent image of the time spent in every step. These are two examples:
OLAP_SCAN_OPERATOR (id=4. desk title = Z03_DI_MID):
- ExecTime: 457.750ms
- WaitForDependency[OLAP_SCAN_OPERATOR_DEPENDENCY]Time: 436.883ms
- Alternate Supply Operator: The execution time of
EXCHANGE_OPERATOR
is 86.691us. The time spent ready for knowledge from upstream is 409.256us.
EXCHANGE_OPERATOR (id=3):
- ExecTime: 86.691us
- WaitForDependencyTime: 0ns
- WaitForData0: 409.256us
What’s Subsequent?
From the Volcano Mannequin to the Pipeline Execution Engine, Apache Doris 2.0.0 has overcome the deadlocks below excessive cluster load and enormously elevated CPU utilization. Now, from the Pipeline Execution Engine to PipelineX, Apache Doris 2.1.0 is extra production-friendly because it has ironed out the kinks in concurrency, overheads, and operator profile.
What’s subsequent in our roadmap is to assist spilling knowledge to disk in PipelineX to additional enhance question velocity and system reliability. We additionally plan to advance additional when it comes to automation, resembling self-adaptive concurrency and auto-execution plan optimization, accompanied by NUMA applied sciences to reap higher efficiency from {hardware} assets.
If you wish to discuss to the superb Doris builders who lead these modifications, you’re greater than welcome to affix the Apache Doris group.