From 6352b28b4c8a1d89f387f0b1b5eab20d76041a25 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 9 Nov 2023 09:47:55 +0800 Subject: [PATCH] fix: split scan columns from cache last scan --- source/libs/command/inc/commandInt.h | 2 +- source/libs/executor/src/mergeoperator.c | 9 +++++ source/libs/planner/src/planOptimizer.c | 49 ++++++++++++++++++++---- 3 files changed, 51 insertions(+), 9 deletions(-) diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 5d4bf4e0ec..bb0d8a32dd 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -59,7 +59,7 @@ extern "C" { #define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c" #define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64 #define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f" -#define EXPLAIN_MERGE_FORMAT "SortMerge" +#define EXPLAIN_MERGE_FORMAT "Merge" #define EXPLAIN_MERGE_KEYS_FORMAT "Merge Key: " #define EXPLAIN_IGNORE_GROUPID_FORMAT "Ignore Group Id: %s" #define EXPLAIN_PARTITION_KETS_FORMAT "Partition Key: " diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index a580524e87..093b6ab11e 100755 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -325,6 +325,7 @@ SSDataBlock* doColsMerge(SOperatorInfo* pOperator) { SMultiwayMergeOperatorInfo* pInfo = pOperator->info; SSDataBlock* pBlock = NULL; SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; + int32_t nullBlkNum = 0; qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo)); @@ -333,11 +334,19 @@ SSDataBlock* doColsMerge(SOperatorInfo* pOperator) { if (pBlock && pBlock->info.rows > 1) { qError("more than 1 row returned from downstream, rows:%" PRId64, pBlock->info.rows); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); + } else if (NULL == pBlock) { + nullBlkNum++; } copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock); } + setOperatorCompleted(pOperator); + + if (2 == nullBlkNum) { + return NULL; + } + pInfo->binfo.pRes->info.rows = 1; return pInfo->binfo.pRes; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d9f24313d1..3871928f81 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2590,6 +2590,7 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { typedef struct SLastRowScanOptSetColDataTypeCxt { bool doAgg; SNodeList* pLastCols; + SNodeList* pOtherCols; } SLastRowScanOptSetColDataTypeCxt; static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { @@ -2632,6 +2633,33 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo } } +static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pList1, SNodeList* pList2) { + SNode* pTarget = NULL; + WHERE_EACH(pTarget, pTargets) { + bool found = false; + SNode* pCol = NULL; + FOREACH(pCol, pList1) { + if (nodesEqualNode(pCol, pTarget)) { + found = true; + break; + } + } + if (!found) { + FOREACH(pCol, pList2) { + if (nodesEqualNode(pCol, pTarget)) { + found = true; + break; + } + } + } + if (!found) { + ERASE_NODE(pTargets); + continue; + } + WHERE_NEXT; + } +} + static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized); @@ -2639,7 +2667,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic return TSDB_CODE_SUCCESS; } - SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL}; + SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL, .pOtherCols = NULL}; SNode* pNode = NULL; SColumnNode* pPKTsCol = NULL; SColumnNode* pNonPKCol = NULL; @@ -2660,14 +2688,18 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt); nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1)); } - } else if (FUNCTION_TYPE_SELECT_VALUE == funcType) { + } else { pNode = nodesListGetNode(pFunc->pParameterList, 0); - if (nodeType(pNode) == QUERY_NODE_COLUMN) { - SColumnNode* pCol = (SColumnNode*)pNode; - if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - pPKTsCol = pCol; - } else { - pNonPKCol = pCol; + nodesListMakeAppend(&cxt.pOtherCols, pNode); + + if (FUNCTION_TYPE_SELECT_VALUE == funcType) { + if (nodeType(pNode) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + pPKTsCol = pCol; + } else { + pNonPKCol = pCol; + } } } } @@ -2681,6 +2713,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true); nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false); + lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols); if (pPKTsCol && pScan->node.pTargets->length == 1) { // when select last(ts),ts from ..., we add another ts to targets sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);