From c8a41e344449f9c6728b7045afc8f6831ca7eba7 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 28 Feb 2023 16:05:41 +0800 Subject: [PATCH 1/2] fix(query): make exchange operator inherit input order from upstream --- source/libs/executor/src/executorimpl.c | 11 +++++++---- tests/parallel_test/cases.task | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 780e371339..eb070121a8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1452,13 +1452,16 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { + if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { + // for exchange operator inherit order from upstream and do not overwrite here + *scanFlag = MAIN_SCAN; + return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; *order = pTableScanInfo->base.cond.order; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7b6278c916..9938bba1b2 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -945,7 +945,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3 -#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3 From 1081d688582e688e167cf28cd546df9836be27c9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 28 Feb 2023 18:30:51 +0800 Subject: [PATCH 2/2] add inherit from upstream order option for exchange --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 10 ++++++---- source/libs/executor/src/filloperator.c | 4 ++-- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 4 ++-- source/libs/executor/src/timewindowoperator.c | 6 +++--- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5df3b14a5b..be79054c1b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -732,7 +732,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3 STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); extern void doDestroyExchangeOperatorInfo(void* param); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index eb070121a8..9c85ebae9d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1449,7 +1449,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t return TSDB_CODE_SUCCESS; } -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || @@ -1459,7 +1459,9 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { - // for exchange operator inherit order from upstream and do not overwrite here + if (!inheritUsOrder) { + *order = TSDB_ORDER_ASC; + } *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { @@ -1476,7 +1478,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { return TSDB_CODE_INVALID_PARA; } else { - return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag); + return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder); } } } @@ -1592,7 +1594,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } hasValidBlock = true; - int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index adb045a69f..2a33e3527a 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -69,7 +69,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; - getTableScanInfo(pOperator, &order, &scanFlag); + getTableScanInfo(pOperator, &order, &scanFlag, false); int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); @@ -128,7 +128,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; - getTableScanInfo(pOperator, &order, &scanFlag); + getTableScanInfo(pOperator, &order, &scanFlag, false); doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > 0) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5ef82f0e08..0fc2cdb46a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { break; } - int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 3ae114c656..49bc5af634 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -289,7 +289,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - int32_t code = getTableScanInfo(downstream, &order, &scanFlag); + int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -441,7 +441,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp SExprSupp* pSup = &pOperator->exprSupp; // the pDataBlock are always the same one, no need to call this again - int32_t code = getTableScanInfo(downstream, &order, &scanFlag); + int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 20d4f46eaf..6411d862ae 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1072,7 +1072,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag); + getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag, true); if (pInfo->scalarSupp.pExprInfo != NULL) { SExprSupp* pExprSup = &pInfo->scalarSupp; @@ -4294,7 +4294,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } } - getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); + getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag, false); setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); @@ -4621,7 +4621,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); + getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag, false); setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);