Question about executor
Hi I am reading heavyDB code/doc and have some questions about the executor.
From my understanding, the execution logic can be summarized as follows: Calcite logical plan --> optimized DAG (after heavyDB DAG optimization) --> query step (each node will be translated into a query step) --> work unit (each query step will be converted to a work unit). --> execute work unit (include both JIT and execution). Please correct my understanding if I am wrong!!
I have some questions about the execution process: 1. Suppose we have single GPU, are different work units executed sequentially? For example, workunit1 is a join, workunit2 is a groupby (we assume the join and groupby will not be fused into a compound node) , the executor will get all the join results before executing the grouby? 2. How are data passed between different work units? What if the intermediate results are too big? Still the example above, you don't know the size of join outputs until you execute it, but you need to prepare for the intermediate result buffer when creating the work unit, then how do you know how much intermediate buffer size you need to allocate? Moreover, the intermediate join result might be bigger than the GPU memory size, do you just fail or offload to CPU? 3. I notice heavyDB has the fusion optimization as described here https://heavyai.github.io/heavydb/execution/optimizer.html. I'm wondering the granularity of fusion. Do you just coalesce the kernel launch (e.g., still launch multiple kernels for different logical operators), or do you combine different operations on the same record (e.g., launch one kernel that performs multiple operations, such as groupby, project and filter)?
Thanks a lot!! Any comments are highly appreciated!
-
Hi @Lily_Liu,
I'll try answering your questions.
- On a typical workload with one or more join, a group by with some filtering is run in-lined, with the join first, then the filter, and finally the group by. So in such a simple case, no intermediate result is created, and data is supposed to use registers. You have intermediate results when you run a multistep running query
as an example, a query like that
heavysql> explain select C_NAME,count(*) from customer join nation on C_NATIONKEY = N_NATIONKEY group by C_NAME;
is served by this code
IR for the GPU: =============== ; Function Attrs: nounwind uwtable define dso_local void @multifrag_query_hoisted_literals(i8*** readonly, i64* nocapture readonly, i8* readnone, i64* readnone, i64* readnone, i32* readnone, i32* readnone, i64* readnone, i64** readnone, i32* readnone, i32* nocapture readonly, i64* readnone) local_unnamed_addr #22 { %13 = load i64, i64* %1, align 8, !tbaa !9 %14 = icmp eq i64 %13, 0 br i1 %14, label %31, label %15 15: ; preds = %12 %16 = icmp eq i8*** %0, null br i1 %16, label %17, label %32 17: ; preds = %26, %15 %18 = phi i32 [ %27, %26 ], [ 0, %15 ] %19 = load i32, i32* %10, align 4, !tbaa !7 %20 = mul i32 %19, %18 %21 = zext i32 %20 to i64 %22 = getelementptr inbounds i64, i64* %3, i64 %21 %23 = getelementptr inbounds i64, i64* %4, i64 %21 call void @query_group_by_template(i8** null, i8* %2, i64* %22, i64* %23, i32* %5, i64* %7, i64** %8, i32 %18, i64* %11, i32* %6, i32* %9) br label %.error_check .error_check: ; preds = %17 %24 = call i32 @get_error_code(i32* %9) %25 = icmp ne i32 %24, 0 br i1 %25, label %.error_exit, label %26 .error_exit: ; preds = %.error_check call void @record_error_code(i32 %24, i32* %9) ret void 26: ; preds = %.error_check %27 = add i32 %18, 1 %28 = zext i32 %27 to i64 %29 = load i64, i64* %1, align 8, !tbaa !9 %30 = icmp ugt i64 %29, %28 br i1 %30, label %17, label %31 31: ; preds = %44, %26, %12 ret void 32: ; preds = %44, %15 %33 = phi i64 [ %46, %44 ], [ 0, %15 ] %34 = phi i32 [ %45, %44 ], [ 0, %15 ] %35 = getelementptr inbounds i8**, i8*** %0, i64 %33 %36 = load i8**, i8*** %35, align 8, !tbaa !15 %37 = load i32, i32* %10, align 4, !tbaa !7 %38 = mul i32 %37, %34 %39 = zext i32 %38 to i64 %40 = getelementptr inbounds i64, i64* %3, i64 %39 %41 = getelementptr inbounds i64, i64* %4, i64 %39 call void @query_group_by_template(i8** %36, i8* %2, i64* %40, i64* %41, i32* %5, i64* %7, i64** %8, i32 %34, i64* %11, i32* %6, i32* %9) br label %.error_check1 .error_check1: ; preds = %32 %42 = call i32 @get_error_code(i32* %9) %43 = icmp ne i32 %42, 0 br i1 %43, label %.error_exit2, label %44 .error_exit2: ; preds = %.error_check1 call void @record_error_code(i32 %42, i32* %9) ret void 44: ; preds = %.error_check1 %45 = add i32 %34, 1 %46 = zext i32 %45 to i64 %47 = load i64, i64* %1, align 8, !tbaa !9 %48 = icmp ugt i64 %47, %46 br i1 %48, label %32, label %31 } ; Function Attrs: uwtable define void @query_group_by_template(i8** nocapture readnone %byte_stream, i8* nocapture readonly %literals, i64* nocapture readnone %row_count_ptr, i64* nocapture readonly %frag_row_off_ptr, i32* %max_matched_ptr, i64* %agg_init_val, i64** %group_by_buffers, i32 %frag_idx, i64* %join_hash_tables, i32* %total_matched, i32* %error_code) #25 { .entry: %0 = getelementptr i8*, i8** %byte_stream, i32 0 %1 = load i8*, i8** %0 %2 = getelementptr i8*, i8** %byte_stream, i32 1 %3 = load i8*, i8** %2 %4 = getelementptr i8*, i8** %byte_stream, i32 2 %5 = load i8*, i8** %4 %6 = getelementptr i8, i8* %literals, i16 0 %7 = bitcast i8* %6 to i32* %literal_0 = load i32, i32* %7 %row_count = load i64, i64* %row_count_ptr, align 8 %8 = load i32, i32* %max_matched_ptr, align 8 %crt_matched = alloca i32 %old_total_matched = alloca i32 %9 = call i32 @pos_start_impl(i32* %error_code) %10 = call i32 @pos_step_impl() %11 = call i32 @group_buff_idx_impl() %12 = sext i32 %9 to i64 %13 = getelementptr i64*, i64** %group_by_buffers, i32 %11 %col_buffer = load i64*, i64** %13, align 8 %result_buffer = call i64* @init_shared_mem_nop(i64* %col_buffer, i32 0) %14 = icmp slt i64 %12, %row_count br i1 %14, label %.loop.preheader, label %.exit .loop.preheader: ; preds = %.entry %15 = sext i32 %10 to i64 br label %.forbody .forbody: ; preds = %.forbody, %.loop.preheader %pos = phi i64 [ %12, %.loop.preheader ], [ %17, %.forbody ] %16 = call i32 @row_func_hoisted_literals(i64* %result_buffer, i64* null, i32* %crt_matched, i32* %total_matched, i32* %old_total_matched, i32* %max_matched_ptr, i64* %agg_init_val, i64 %pos, i64* %frag_row_off_ptr, i64* %row_count_ptr, i8* %literals, i8* %1, i8* %3, i8* %5, i64* %join_hash_tables, i32 %literal_0) %17 = add i64 %pos, %15 %18 = icmp slt i64 %17, %row_count br i1 %18, label %.forbody, label %._crit_edge ._crit_edge: ; preds = %.forbody br label %.exit .exit: ; preds = %._crit_edge, %.entry call void @write_back_nop(i64* %col_buffer, i64* %result_buffer, i32 0) ret void } ; Function Attrs: alwaysinline define i32 @row_func_hoisted_literals(i64* %group_by_buff, i64* %varlen_output_buff, i32* %crt_matched, i32* %total_matched, i32* %old_total_matched, i32* %max_matched, i64* %agg_init_val, i64 %pos, i64* %frag_row_off, i64* %num_rows_per_scan, i8* %literals, i8* %col_buf0, i8* %col_buf1, i8* %col_buf2, i64* %join_hash_tables, i32 %arg_literal_0) #26 { entry: %loop_done = alloca i1 br label %singleton_true_ exit: ; preds = %singleton_true_, %loop_done_false ret i32 0
here comes the join
singleton_true_: ; preds = %entry %0 = call i64 @fixed_width_int_decode(i8* %col_buf1, i32 4, i64 %pos) %1 = trunc i64 %0 to i32 %2 = ptrtoint i64* %join_hash_tables to i64 %3 = sext i32 %1 to i64 %4 = call i64 @hash_join_idx(i64 %2, i64 %3, i64 0, i64 -1) %5 = icmp sge i64 %4, 0 %remaining_outer_cond_match = alloca i1 store i1 true, i1* %remaining_outer_cond_match %6 = load i1, i1* %remaining_outer_cond_match %7 = and i1 %5, %6 br i1 %7, label %loop_body, label %exit loop_body: ; preds = %singleton_true_ store i1 true, i1* %loop_done %8 = call i32 @filter_func_hoisted_literals(i8* %col_buf0, i64 %pos, i64* %group_by_buff, i1* %loop_done, i32 %arg_literal_0) %9 = load i1, i1* %loop_done br i1 %9, label %loop_done_true, label %loop_done_false loop_done_true: ; preds = %loop_body ret i32 %8 loop_done_false: ; preds = %loop_body br label %exit }
here is the filtering of record returned by the join in this case an, equality icmp eq i32 %1, %arg_literal_0
; Function Attrs: alwaysinline define i32 @filter_func_hoisted_literals(i8* %col_buf0, i64 %pos, i64* %group_by_buff, i1* %loop_done, i32 %arg_literal_0) #26 { entry: %0 = call i64 @fixed_width_int_decode(i8* %col_buf0, i32 4, i64 %pos) %1 = trunc i64 %0 to i32 %2 = icmp eq i32 %1, %arg_literal_0 %3 = and i1 true, %2 br i1 %3, label %filter_true, label %filter_false filter_true: ; preds = %entry %4 = sext i32 %1 to i64
and here the group by if the filtering succeeded
%5 = call i64* @get_group_value_fast_keyless(i64* %group_by_buff, i64 %4, i64 0, i64 0, i32 1) %6 = bitcast i64* %5 to i32* %agg_col_ptr = getelementptr i32, i32* %6, i32 0 call void @agg_id_int32_shared(i32* %agg_col_ptr, i32 %1) %7 = bitcast i64* %5 to i32* %8 = getelementptr i32, i32* %7, i32 1 %9 = atomicrmw add i32* %8, i32 1 monotonic br label %filter_false filter_false: ; preds = %filter_true, %entry store i1 false, i1* %loop_done ret i32 0 }
- In case there is an intermediate result, we have intermediate buffers, and if we find they aren't big enough, an additional query is run to estimate better the size ( you should see the pre-flight name in the logs)
We have some parameters like this that you can get using the --dev-options switch instead the regular --help.
--enable-bump-allocator [=arg(=1)] (=0) Enable the bump allocator for projection queries on GPU. The bump allocator will allocate a fixed size buffer for each query, track the number of rows passing the kernel during query execution, and copy back only the rows that passed the kernel to CPU after execution. When disabled, pre-flight count queries are used to size the output buffer for projection queries.
- Replied in the first point. We collapse everything in the same kernel.
Regards, Candido
-
Hi Candido, thanks for the reply, this is very helpful!
One more question, is there a way to dump the optimized DAG (the DAG after collapse)? I try to add the following lines after https://github.com/heavyai/heavydb/blob/cde582ebc3edba3fb86bacefa5bd9b3418a367b4/QueryEngine/RelAlgDag.cpp#L3144
VLOG(1) << tree_string(rel_alg_dag.getRootNodeShPtr().get()) << "\ ";
But I get the following outputs:2022-07-05T22:54:50.312586 1 18865 0 8 RelAlgDag.cpp:3146 optimized DAG 2022-07-05T22:54:50.312624 1 18865 0 8 RelAlgDag.cpp:3148 &RelAlgNode &RelAlgNode &RelAlgNode &RelAlgNode
It would be helpful to know which operators are collapsed easily, thanks a lot!
Please sign in to leave a comment.
Comments
5 comments