[enhance](agg) Optimize AGG shuffle key selection#60572
[enhance](agg) Optimize AGG shuffle key selection#60572feiniaofeiafei wants to merge 8 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
1 similar comment
|
run buildall |
TPC-H: Total hot run time: 30094 ms |
ClickBench: Total hot run time: 28.52 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
add desc |
| } | ||
| if (!chosenEnforcerIdList.isEmpty()) { | ||
| str.append(" chosen enforcer(id, requiredProperties):\n"); | ||
| str.append(" ⭐ Chosen Enforcer(ID, RequiredProperties):\n"); |
There was a problem hiding this comment.
do not add emoji in toString
There was a problem hiding this comment.
Pull request overview
This PR enhances Nereids aggregation/join planning by introducing shuffle-key pruning logic (optionally choosing a single “best” shuffle key based on statistics) and adds regression coverage and debugging/shape-plan improvements to validate and inspect the chosen hash columns.
Changes:
- Add shuffle-key selection/pruning utilities and wire them into hash-join and global-agg request property derivation.
- Introduce new session variables to control the optimization behavior and thresholds.
- Add regression tests + expected plan-shape outputs, and update existing unit tests for the new aggregate plan attribute.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/nereids_p0/hash_shuffle_key_prune/hash_shuffle_key_prune.groovy | New regression suite validating shape-plan hash columns for agg/join shuffle-key pruning. |
| regression-test/data/nereids_p0/hash_shuffle_key_prune/hash_shuffle_key_prune.out | Golden output for the new regression suite. |
| fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanEqualsTest.java | Updates PhysicalHashAggregate construction to match new signature. |
| fe/fe-core/src/test/java/org/apache/doris/nereids/properties/RequestPropertyDeriverTest.java | Updates tests for new PhysicalHashAggregate parameter. |
| fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java | Updates tests for new PhysicalHashAggregate parameter. |
| fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java | Adds skew/balance scoring helpers for shuffle-key selection. |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Adds choose_one_agg_shuffle_key and shuffle_key_prune_threshold session variables. |
| fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java | Adds helpers to compute total parallel instance count (alive BEs × parallelism). |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java | Adds hasSourceRepeat flag and propagates it through copy/with* methods. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDistribute.java | Enhances shape-plan output to include resolved “Hash Columns:[…]” when enabled. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java | Refactors salt factor computation to use new ConnectContext instance helpers. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpJoinFromUnionAll.java | Fixes/updates the rewrite diagram comment. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggWithoutDistinct.java | Propagates hasSourceRepeat into generated PhysicalHashAggregate plans. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhaseWithoutGbyKey.java | Passes cascades context into rewrite path and propagates hasSourceRepeat. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggMultiPhase.java | Adjusts partition expression construction for multi-phase distinct and propagates hasSourceRepeat. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/SplitAggBaseRule.java | Updates helper constructors/calls for new PhysicalHashAggregate signature. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ShuffleKeyPruneUtils.java | New utility implementing shuffle-key scoring/selection for agg and join scenarios. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java | Integrates shuffle-key pruning into agg/join request property derivation. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java | Removes a prior ban-check related to aggregate child distribution. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java | Improves memo dump formatting in toString(). |
| fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java | Refines GroupExpression toString() formatting and simplifies plan text. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java | Enhances Group toString() formatting, including lowest-plan details. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java | Uses ConnectContext helpers for BE/parallel instance calculations. |
| fe/fe-core/src/main/java/org/apache/doris/nereids/PlanContext.java | Exposes getGroupExpression() for callers (used by shuffle-key pruning logic). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (!chosenEnforcerIdList.isEmpty()) { | ||
| str.append(" chosen enforcer(id, requiredProperties):\n"); | ||
| str.append(" ⭐ Chosen Enforcer(ID, RequiredProperties):\n"); | ||
| for (int i = 0; i < chosenEnforcerIdList.size(); i++) { | ||
| str.append(" (").append(i).append(")").append(chosenEnforcerIdList.get(i)).append(", ") | ||
| .append(chosenEnforcerPropertiesList.get(i)).append("\n"); | ||
| .append(formatProperty(chosenEnforcerPropertiesList.get(i))).append("\n"); | ||
| } | ||
| } | ||
| if (chosenGroupExpressionId != -1) { | ||
| str.append(" chosen expression id: ").append(chosenGroupExpressionId).append("\n"); | ||
| str.append(" chosen properties: ").append(chosenProperties).append("\n"); | ||
| str.append(" ⭐ Chosen Expression ID: ").append(chosenGroupExpressionId).append("\n"); | ||
| str.append(" ⭐ Chosen Properties: ").append(formatProperty(chosenProperties)).append("\n"); | ||
| } |
There was a problem hiding this comment.
This toString() output introduces emoji/Unicode symbols (e.g., "⭐") in diagnostic text. Similar to Memo.toString(), this can lead to encoding/log-parsing issues in some environments. Prefer plain ASCII markers or gate the decorated output behind a debug option.
| def checkShuffleKey = { String sqlString, String... expectedHashCols -> | ||
| def result = sql """ explain shape plan ${sqlString}""" | ||
| for (def expected : expectedHashCols) { | ||
| assertTrue(result.toString().contains("Hash Columns:[${expected}]"), "\n${result.collect { it[0] }.join('\n')} not contains expected hash cols: ${expected}") | ||
| } | ||
| } |
There was a problem hiding this comment.
checkShuffleKey is defined but never used in this suite. Keeping unused helpers makes the regression test harder to maintain; either remove it or convert some of the qt_... checks to use it so the intent (verifying chosen hash columns) is explicit.
| def checkShuffleKey = { String sqlString, String... expectedHashCols -> | |
| def result = sql """ explain shape plan ${sqlString}""" | |
| for (def expected : expectedHashCols) { | |
| assertTrue(result.toString().contains("Hash Columns:[${expected}]"), "\n${result.collect { it[0] }.join('\n')} not contains expected hash cols: ${expected}") | |
| } | |
| } |
| private final boolean hasSourceRepeat; | ||
|
|
There was a problem hiding this comment.
hasSourceRepeat is a new stateful attribute but it is not included in equals()/hashCode(). Since GroupExpression.equals() depends on plan.equals(), aggregates that differ only by hasSourceRepeat can be treated as identical, which can incorrectly deduplicate memo expressions and affect optimization/correctness. Include hasSourceRepeat in both equals() and hashCode() (and consider exposing it in toString() for debugging).
| public static List<ExprId> selectOptimalShuffleKeyForAggWithParentHashRequest( | ||
| PhysicalHashAggregate<? extends Plan> agg, Set<ExprId> intersectIdSet, PlanContext context) { | ||
| List<ExprId> orderedIds = Utils.fastToImmutableList(intersectIdSet); | ||
| if (!context.getConnectContext().getSessionVariable().chooseOneAggShuffleKey | ||
| || intersectIdSet.size() <= context.getConnectContext().getSessionVariable().shuffleKeyPruneThreshold) { | ||
| return orderedIds; |
There was a problem hiding this comment.
orderedIds is built from intersectIdSet (a Set), so the resulting shuffle key list order is non-deterministic. This can lead to unstable plan shapes and potentially different distribution specs across runs. Preserve a deterministic order (e.g., iterate parentHashExprIds and keep those present in the intersection, or preserve group-by order) instead of materializing directly from a Set.
| * When hotValues not exist or empty, treat nulls as the only hot value. | ||
| * Otherwise use the max of (top hot value count, null count). | ||
| */ | ||
| public static double getMaxHotValueCntIncludeNull(ColumnStatistic columnStatistic, double rowCount) { | ||
| Map<Literal, Float> hotValues = columnStatistic.getHotValues(); | ||
| if (columnStatistic.getHotValues() == null || hotValues.isEmpty()) { | ||
| return columnStatistic.numNulls; | ||
| } | ||
| double maxRate = hotValues.values().stream().mapToDouble(Float::doubleValue).max().orElse(0); | ||
| double maxHotRows = maxRate * rowCount; | ||
| return maxHotRows > columnStatistic.numNulls ? maxHotRows : columnStatistic.numNulls; |
There was a problem hiding this comment.
getHotValues() returns percentages (see getHotValues Javadoc: "row count percentage"), but getMaxHotValueCntIncludeNull treats the max value as a ratio and multiplies by rowCount directly. This will massively overestimate hot-row counts (e.g., 30% becomes 30×rowCount) and break isBalanced / skew scoring. Convert percentage to a fraction (e.g., divide by ColumnStatistic.ONE_HUNDRED) before multiplying, and reconsider treating hotValues.isEmpty() as "no hot values" (in ColumnStatistic an empty map means hot values exist but are unknown).
| * When hotValues not exist or empty, treat nulls as the only hot value. | |
| * Otherwise use the max of (top hot value count, null count). | |
| */ | |
| public static double getMaxHotValueCntIncludeNull(ColumnStatistic columnStatistic, double rowCount) { | |
| Map<Literal, Float> hotValues = columnStatistic.getHotValues(); | |
| if (columnStatistic.getHotValues() == null || hotValues.isEmpty()) { | |
| return columnStatistic.numNulls; | |
| } | |
| double maxRate = hotValues.values().stream().mapToDouble(Float::doubleValue).max().orElse(0); | |
| double maxHotRows = maxRate * rowCount; | |
| return maxHotRows > columnStatistic.numNulls ? maxHotRows : columnStatistic.numNulls; | |
| * When hotValues do not exist (null), treat nulls as the only hot value. | |
| * Otherwise use the max of (top hot value count, null count). | |
| */ | |
| public static double getMaxHotValueCntIncludeNull(ColumnStatistic columnStatistic, double rowCount) { | |
| Map<Literal, Float> hotValues = columnStatistic.getHotValues(); | |
| if (hotValues == null) { | |
| return columnStatistic.numNulls; | |
| } | |
| // getHotValues() returns percentages (0-100). Convert to fraction before applying to rowCount. | |
| double maxRatePercent = hotValues.values().stream() | |
| .mapToDouble(Float::doubleValue) | |
| .max() | |
| .orElse(0); | |
| double maxHotRows = (maxRatePercent / ColumnStatistic.ONE_HUNDRED) * rowCount; | |
| return Math.max(maxHotRows, columnStatistic.numNulls); |
| }, needForward = false) | ||
| public boolean aggShuffleUseParentKey = true; | ||
|
|
||
| @VariableMgr.VarAttr(name = CHOOSE_ONE_AGG_SHUFFLE_KEY, needForward = false) |
There was a problem hiding this comment.
The new session variable choose_one_agg_shuffle_key is missing the required bilingual description in @VarAttr. Without it, SHOW VARIABLES/docs will expose the default placeholder ("待补充"/"TODO"). Please add a clear CN/EN description of what enabling this does and any caveats.
| @VariableMgr.VarAttr(name = CHOOSE_ONE_AGG_SHUFFLE_KEY, needForward = false) | |
| @VariableMgr.VarAttr( | |
| name = CHOOSE_ONE_AGG_SHUFFLE_KEY, | |
| needForward = false, | |
| description = { | |
| "在聚合算子存在多个可选分组键时,只选择一个键作为 shuffle 键,以减少 shuffle 数据量;" | |
| + "可能增加数据倾斜风险,当前为实验特性,建议在确认无明显倾斜时开启。", | |
| "When aggregation operators have multiple candidate grouping keys, choose only one key as " | |
| + "the shuffle key to reduce the amount of shuffled data. This may increase the risk " | |
| + "of data skew; it is an experimental feature and should be enabled only after " | |
| + "verifying that skew is acceptable." | |
| }) |
| description = {"控制在聚合或连接操作中进行 shuffle key 裁剪的阈值。" | ||
| + "当 shuffle key 的基数小于该阈值时,启用裁剪以哈希分发的开销。", | ||
| "Controls the threshold for performing shuffle key pruning in aggregation or join operations. "}) |
There was a problem hiding this comment.
The shuffle_key_prune_threshold session variable description is currently unclear/grammatically incorrect (CN ends with a comma and reads like it's missing a verb such as “减少/降低”). Please refine the CN/EN descriptions to precisely state when pruning is applied (e.g., group-by/join key count > threshold) and what is being optimized.
| description = {"控制在聚合或连接操作中进行 shuffle key 裁剪的阈值。" | |
| + "当 shuffle key 的基数小于该阈值时,启用裁剪以哈希分发的开销。", | |
| "Controls the threshold for performing shuffle key pruning in aggregation or join operations. "}) | |
| description = {"控制在聚合或连接算子中进行 shuffle key 裁剪的阈值。" | |
| + "当 group by 或 join 的键数量大于该阈值时,规划器会尝试裁剪部分 shuffle key," | |
| + "以减少哈希分发的数据量和计算开销。", | |
| "Controls the threshold for pruning shuffle keys in aggregation or join operators. " | |
| + "When the number of group-by or join keys is greater than this threshold, " | |
| + "the planner will try to prune some shuffle keys to reduce hash-distribution " | |
| + "data volume and computation overhead."}) |
| public String toString() { | ||
| StringBuilder builder = new StringBuilder(); | ||
| builder.append("root:").append(getRoot()).append("\n"); | ||
| boolean first = true; | ||
| Group rootGroup = getRoot(); | ||
| for (Group group : groups.values()) { | ||
| builder.append("\n\n").append(group).append("\n"); | ||
| if (!first) { | ||
| builder.append("\n").append("═══════════════════════════════════════════════════════════").append("\n"); | ||
| } | ||
| if (group.equals(rootGroup)) { | ||
| builder.append("\n").append("🌲 ROOT ").append(group); | ||
| } else { | ||
| builder.append("\n").append(group); | ||
| } |
There was a problem hiding this comment.
toString() output now contains emoji/Unicode decorations (e.g., "🌲 ROOT") and heavy box-drawing separators. These strings often end up in logs or diagnostic dumps, and non-ASCII output can cause encoding issues or break log parsing tools. Consider using plain ASCII markers (e.g., "ROOT" / "*" and "-----") or gating the fancy formatting behind a debug flag.
9ba21c5 to
acb18d5
Compare
|
run buildall |
TPC-H: Total hot run time: 30346 ms |
TPC-DS: Total hot run time: 190048 ms |
ClickBench: Total hot run time: 28.18 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 30194 ms |
TPC-DS: Total hot run time: 190242 ms |
ClickBench: Total hot run time: 28.39 s |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
1 similar comment
|
run buildall |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 28618 ms |
TPC-DS: Total hot run time: 183604 ms |
|
run buildall |
TPC-H: Total hot run time: 28794 ms |
TPC-DS: Total hot run time: 183545 ms |
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 28789 ms |
TPC-DS: Total hot run time: 182816 ms |
cf46574 to
1bf0157
Compare
|
run buildall |
TPC-H: Total hot run time: 27674 ms |
TPC-DS: Total hot run time: 151878 ms |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
TPC-H: Total hot run time: 27964 ms |
TPC-DS: Total hot run time: 151477 ms |
This pr add variable choose_one_agg_shuffle_key, default value if false, "set choose_one_agg_shuffle_key=true;" can make agg choose 1 key with highest ndv to shuffle. add change visitPhysicalHashJoin in RequestPropertyDeriver add in requestPropertyDeriver add choose one partition key when specify partitionExpressions unify choosing support when parent request hash add fix fix remove 2+2 special logic fix order of partitionExprs that lead to physicalProperties(GATHER) doesn't exist in root group add test [enhance](memo)improve memo print add test add test add join agg test fix style fix ut remove emoji turn on CHOOSE_ONE_AGG_SHUFFLE_KEY default repeat is allowed fix test fix make ut work fix add ut add result test add ut add ut make memo print work better fix make 512 multiplier fix fix fix use new strategy
815e7f6 to
aa6a62f
Compare
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 27716 ms |
TPC-H: Total hot run time: 27696 ms |
TPC-DS: Total hot run time: 151213 ms |
FE UT Coverage ReportIncrement line coverage |
What problem does this PR solve?
In distributed AGG, shuffle keys are usually all Group By columns. In BE, hash computation grows linearly with the number of shuffle keys, which can add noticeable overhead when there are many Group By columns.
This PR selects a single high-NDV column from the Group By keys as the shuffle key, reducing the number of shuffle keys and lowering hash computation and shuffle cost while preserving correctness.
Goals
Reduce shuffle key count to lower hash computation and shuffle overhead
Keep data distribution even by choosing a single high-cardinality shuffle key
Prefer numeric types to reduce string hash cost
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Implementation
Candidates are scored using statistics; the highest-scoring column is chosen as the shuffle key:
Factors:
NDV: Higher cardinality → better distribution
Hot value skew: Avoid columns that cause data skew
Data type: Numeric types have lower hash cost
Trigger Conditions
Switch: choose_one_agg_shuffle_key (default: true)
Threshold: shuffle_key_prune_threshold (default: 5); optimization applies only when shuffle key count > 5
Exclusion: AGG with source repeat is not optimized (to avoid null skew)
Dependency: Requires statistics; falls back to original logic when statistics are missing
This pr also make memo print better.
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)