diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 9725aa48c0..7fbdbfb211 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -121,6 +121,7 @@ int32_t nodesListMakeAppend(SNodeList** pList, SNode* pNode); int32_t nodesListMakeStrictAppend(SNodeList** pList, SNode* pNode); int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc); int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc); +int32_t nodesListMakeStrictAppendList(SNodeList** pTarget, SNodeList* pSrc); int32_t nodesListPushFront(SNodeList* pList, SNode* pNode); SListCell* nodesListErase(SNodeList* pList, SListCell* pCell); void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index b1f2c4390c..bb47120022 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -145,6 +145,7 @@ typedef struct SAggLogicNode { bool hasGroupKeyOptimized; bool isGroupTb; bool isPartTb; // true if partition keys has tbname + bool hasGroup; } SAggLogicNode; typedef struct SProjectLogicNode { diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 185e23590a..0f2a1e2f29 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -625,7 +625,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "Group" : "Aggragate")); + EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "GroupAggragate" : "Aggragate")); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); @@ -1152,24 +1152,26 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (EXPLAIN_MODE_ANALYZE == ctx->mode) { - // sort method - EXPLAIN_ROW_NEW(level + 1, "Sort Method: "); + if (MERGE_TYPE_SORT == pMergeNode->type) { + // sort method + EXPLAIN_ROW_NEW(level + 1, "Sort Method: "); - int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); - SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0); - SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo; - EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort"); - if (pExecInfo->sortBuffer > 1024 * 1024) { - EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0)); - } else if (pExecInfo->sortBuffer > 1024) { - EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0)); - } else { - EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer); + int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); + SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0); + SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo; + EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort"); + if (pExecInfo->sortBuffer > 1024 * 1024) { + EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0)); + } else if (pExecInfo->sortBuffer > 1024) { + EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0)); + } else { + EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer); + } + + EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); } - - EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); } if (verbose) { @@ -1183,29 +1185,31 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_IGNORE_GROUPID_FORMAT, pMergeNode->ignoreGroupId ? "true" : "false"); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + if (MERGE_TYPE_SORT == pMergeNode->type) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_IGNORE_GROUPID_FORMAT, pMergeNode->ignoreGroupId ? "true" : "false"); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT); - if (pMergeNode->groupSort) { - EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, "_group_id asc"); - if (LIST_LENGTH(pMergeNode->pMergeKeys) > 0) { - EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT); + if (pMergeNode->groupSort) { + EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, "_group_id asc"); + if (LIST_LENGTH(pMergeNode->pMergeKeys) > 0) { + EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT); + } } - } - for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { - SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i); - EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, nodesGetNameFromColumnNode(ptn->pExpr)); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, EXPLAIN_ORDER_STRING(ptn->order)); - if (i != LIST_LENGTH(pMergeNode->pMergeKeys) - 1) { - EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT); + for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { + SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i); + EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, nodesGetNameFromColumnNode(ptn->pExpr)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, EXPLAIN_ORDER_STRING(ptn->order)); + if (i != LIST_LENGTH(pMergeNode->pMergeKeys) - 1) { + EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT); + } } + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); if (pMergeNode->node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 204a9458b8..a580524e87 100755 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -38,7 +38,8 @@ typedef struct SNonSortMergeInfo { } SNonSortMergeInfo; typedef struct SColsMergeInfo { - uint64_t srcBlkIds[2]; + SNodeList* pTargets; + uint64_t srcBlkIds[2]; } SColsMergeInfo; typedef struct SMultiwayMergeOperatorInfo { @@ -150,7 +151,7 @@ SSDataBlock* doSortMerge(SOperatorInfo* pOperator) { SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; SSortHandle* pHandle = pSortMergeInfo->pSortHandle; SSDataBlock* pDataBlock = pInfo->binfo.pRes; - SArray* pColMatchInfo = pInfo->matchInfo.pList; + SArray* pColMatchInfo = pSortMergeInfo->matchInfo.pList; int32_t capacity = pOperator->resultInfo.capacity; qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo)); @@ -234,6 +235,8 @@ void destroySortMergeOperatorInfo(void* param) { pSortMergeInfo->pInputBlock = blockDataDestroy(pSortMergeInfo->pInputBlock); pSortMergeInfo->pIntermediateBlock = blockDataDestroy(pSortMergeInfo->pIntermediateBlock); + taosArrayDestroy(pSortMergeInfo->matchInfo.pList); + tsortDestroySortHandle(pSortMergeInfo->pSortHandle); taosArrayDestroy(pSortMergeInfo->pSortInfo); } @@ -298,25 +301,46 @@ int32_t openColsMergeOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock* pDst, SSDataBlock* pSrc) { + bool isNull = (NULL == pSrc || pSrc->info.rows <= 0); + size_t numOfCols = LIST_LENGTH(pNodeList); + for (int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); + if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == targetBlkId) { + SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, pNode->slotId); + if (isNull) { + colDataSetVal(pDstCol, 0, NULL, true); + } else { + SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, ((SColumnNode*)pNode->pExpr)->slotId); + colDataAssign(pDstCol, pSrcCol, 1, &pDst->info); + } + } + } + + return TSDB_CODE_SUCCESS; +} + SSDataBlock* doColsMerge(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; SSDataBlock* pBlock = NULL; + SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; - qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo)); + qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo)); for (int32_t i = 0; i < 2; ++i) { pBlock = getNextBlockFromDownstream(pOperator, i); - if (NULL == pBlock) { - TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]); - pNonSortMerge->sourceWorkIdx++; - idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx); - continue; + if (pBlock && pBlock->info.rows > 1) { + qError("more than 1 row returned from downstream, rows:%" PRId64, pBlock->info.rows); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } - break; + + copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock); } - return pBlock; + pInfo->binfo.pRes->info.rows = 1; + + return pInfo->binfo.pRes; } void destroyColsMergeOperatorInfo(void* param) { @@ -390,7 +414,6 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { void destroyMultiwayMergeOperatorInfo(void* param) { SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); - taosArrayDestroy(pInfo->matchInfo.pList); if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) { (*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo); @@ -467,6 +490,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size initResultSizeInfo(&pOperator->resultInfo, 1); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + pColsMerge->pTargets = pMergePhyNode->pTargets; pColsMerge->srcBlkIds[0] = getOperatorResultBlockId(downStreams[0], 0); pColsMerge->srcBlkIds[1] = getOperatorResultBlockId(downStreams[1], 0); break; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 028745803b..ce23928268 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -419,6 +419,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(groupSort); CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_FIELD(pSubtable); + COPY_SCALAR_FIELD(cacheLastMode); COPY_SCALAR_FIELD(igLastNull); COPY_SCALAR_FIELD(groupOrderScan); COPY_SCALAR_FIELD(onlyMetaCtbIdx); @@ -443,8 +444,14 @@ static int32_t logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pGroupKeys); CLONE_NODE_LIST_FIELD(pAggFuncs); + COPY_SCALAR_FIELD(hasLastRow); + COPY_SCALAR_FIELD(hasLast); + COPY_SCALAR_FIELD(hasTimeLineFunc); + COPY_SCALAR_FIELD(onlyHasKeepOrderFunc); COPY_SCALAR_FIELD(hasGroupKeyOptimized); + COPY_SCALAR_FIELD(isGroupTb); COPY_SCALAR_FIELD(isPartTb); + COPY_SCALAR_FIELD(hasGroup); return TSDB_CODE_SUCCESS; } @@ -488,6 +495,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst CLONE_NODE_LIST_FIELD(pInputs); COPY_SCALAR_FIELD(numOfChannels); COPY_SCALAR_FIELD(srcGroupId); + COPY_SCALAR_FIELD(colsMerge); COPY_SCALAR_FIELD(needSort); COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(ignoreGroupId); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 4f6d3d95e1..71263892a5 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1571,6 +1571,19 @@ int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) { return code; } + +int32_t nodesListMakeStrictAppendList(SNodeList** pTarget, SNodeList* pSrc) { + if (NULL == *pTarget) { + *pTarget = nodesMakeList(); + if (NULL == *pTarget) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return nodesListStrictAppendList(*pTarget, pSrc); +} + + int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) { if (NULL == pList || NULL == pNode) { return TSDB_CODE_FAILED; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 222aec9813..bed75b84ac 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -747,7 +747,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, pAgg->isGroupTb = pAgg->pGroupKeys ? keysHasTbname(pAgg->pGroupKeys) : 0; pAgg->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0; - + pAgg->hasGroup = pAgg->pGroupKeys || pSelect->pPartitionByList; + if (TSDB_CODE_SUCCESS == code) { *pLogicNode = (SLogicNode*)pAgg; } else { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 0b3a432bec..79747d44fe 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2499,21 +2499,7 @@ static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId, return true; } -static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || - QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { - return false; - } - - SAggLogicNode* pAgg = (SAggLogicNode*)pNode; - SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0); - // Only one of LAST and LASTROW can appear - if (pAgg->hasLastRow == pAgg->hasLast || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions || - !hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) || - IS_TSWINDOW_SPECIFIED(pScan->scanRange)) { - return false; - } - +static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) { bool hasNonPKSelectFunc = false; SNode* pFunc = NULL; int32_t lastColNum = 0, selectNonPKColNum = 0; @@ -2559,13 +2545,48 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { return false; } } else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) { - return false; + *hasOtherFunc = true; } } return true; } +static bool lastRowScanOptCheckLastCache(SAggLogicNode* pAgg, SScanLogicNode* pScan) { + // Only one of LAST and LASTROW can appear + if (pAgg->hasLastRow == pAgg->hasLast || (!pAgg->hasLast && !pAgg->hasLastRow) || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions || + !hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) || + IS_TSWINDOW_SPECIFIED(pScan->scanRange)) { + return false; + } + + return true; +} + +static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || + QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { + return false; + } + + SAggLogicNode* pAgg = (SAggLogicNode*)pNode; + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0); + if (!lastRowScanOptCheckLastCache(pAgg, pScan)) { + return false; + } + + bool hasOtherFunc = false; + if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) { + return false; + } + + if (hasOtherFunc) { + return false; + } + + return true; +} + typedef struct SLastRowScanOptSetColDataTypeCxt { bool doAgg; SNodeList* pLastCols; @@ -2679,6 +2700,201 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic return TSDB_CODE_SUCCESS; } + +static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || + QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { + return false; + } + + SAggLogicNode* pAgg = (SAggLogicNode*)pNode; + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0); + if (!lastRowScanOptCheckLastCache(pAgg, pScan)) { + return false; + } + + bool hasOtherFunc = false; + if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) { + return false; + } + + if (pAgg->hasGroup || !hasOtherFunc) { + return false; + } + + return true; +} + +static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, SAggLogicNode* pAgg, SNodeList* pFunc, SNodeList* pTargets) { + SAggLogicNode* pNew = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG); + if (NULL == pNew) { + nodesDestroyList(pFunc); + nodesDestroyList(pTargets); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pNew->hasLastRow = false; + pNew->hasLast = false; + pNew->hasTimeLineFunc = pAgg->hasTimeLineFunc; + pNew->hasGroupKeyOptimized = false; + pNew->onlyHasKeepOrderFunc = pAgg->onlyHasKeepOrderFunc; + pNew->node.groupAction = pAgg->node.groupAction; + pNew->node.requireDataOrder = pAgg->node.requireDataOrder; + pNew->node.resultDataOrder = pAgg->node.resultDataOrder; + pNew->node.pTargets = pTargets; + pNew->pAggFuncs = pFunc; + pNew->pGroupKeys = nodesCloneList(pAgg->pGroupKeys); + pNew->node.pConditions = nodesCloneNode(pAgg->node.pConditions); + pNew->isGroupTb = pAgg->isGroupTb; + pNew->isPartTb = pAgg->isPartTb; + pNew->hasGroup = pAgg->hasGroup; + pNew->node.pChildren = nodesCloneList(pAgg->node.pChildren); + + *pNewAgg = pNew; + + return TSDB_CODE_SUCCESS; +} + +static int32_t splitCacheLastFuncOptModifyAggLogicNode(SAggLogicNode* pAgg) { + pAgg->hasTimeLineFunc = false; + pAgg->onlyHasKeepOrderFunc = true; + + return TSDB_CODE_SUCCESS; +} + +static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, SAggLogicNode* pAgg1, SAggLogicNode* pAgg2) { + SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); + if (NULL == pMerge) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMerge->colsMerge = true; + pMerge->numOfChannels = 2; + pMerge->srcGroupId = -1; + pMerge->node.precision = pAgg1->node.precision; + + SNode* pNewAgg1 = nodesCloneNode((SNode*)pAgg1); + SNode* pNewAgg2 = nodesCloneNode((SNode*)pAgg2); + if (NULL == pNewAgg1 || NULL == pNewAgg2) { + nodesDestroyNode(pNewAgg1); + nodesDestroyNode(pNewAgg2); + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SAggLogicNode*)pNewAgg1)->node.pParent = (SLogicNode*)pMerge; + ((SAggLogicNode*)pNewAgg2)->node.pParent = (SLogicNode*)pMerge; + + SNode* pNode = NULL; + FOREACH(pNode, ((SAggLogicNode*)pNewAgg1)->node.pChildren) { + ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg1; + } + FOREACH(pNode, ((SAggLogicNode*)pNewAgg2)->node.pChildren) { + ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg2; + } + + int32_t code = nodesListMakeStrictAppendList(&pMerge->node.pTargets, nodesCloneList(pAgg1->node.pTargets)); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppendList(&pMerge->node.pTargets, nodesCloneList(pAgg2->node.pTargets)); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pMerge->node.pChildren, pNewAgg1); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pMerge->node.pChildren, pNewAgg2); + } + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pNewAgg1); + nodesDestroyNode(pNewAgg2); + nodesDestroyNode((SNode*)pMerge); + } else { + *pNew = pMerge; + } + + return code; +} + +static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, splitCacheLastFuncOptMayBeOptimized); + + if (NULL == pAgg) { + return TSDB_CODE_SUCCESS; + } + + SNode* pNode = NULL; + SNodeList* pAggFuncList = NULL; + { + WHERE_EACH(pNode, pAgg->pAggFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + int32_t funcType = pFunc->funcType; + if (FUNCTION_TYPE_LAST_ROW != funcType && FUNCTION_TYPE_LAST != funcType && + FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) { + nodesListMakeStrictAppend(&pAggFuncList, nodesCloneNode(pNode)); + ERASE_NODE(pAgg->pAggFuncs); + continue; + } + WHERE_NEXT; + } + } + + if (NULL == pAggFuncList) { + planError("empty agg func list while splite projections"); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + SNodeList* pTargets = NULL; + { + WHERE_EACH(pNode, pAgg->node.pTargets) { + SColumnNode* pCol = (SColumnNode*)pNode; + SNode* pFuncNode = NULL; + bool found = false; + FOREACH(pFuncNode, pAggFuncList) { + SFunctionNode* pFunc = (SFunctionNode*)pFuncNode; + if (0 == strcmp(pFunc->node.aliasName, pCol->colName)) { + nodesListMakeStrictAppend(&pTargets, nodesCloneNode(pNode)); + found = true; + break; + } + } + if (found) { + ERASE_NODE(pAgg->node.pTargets); + continue; + } + WHERE_NEXT; + } + } + + if (NULL == pTargets) { + planError("empty target func list while splite projections"); + nodesDestroyList(pAggFuncList); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + SMergeLogicNode* pMerge = NULL; + SAggLogicNode* pNewAgg = NULL; + int32_t code = splitCacheLastFuncOptCreateAggLogicNode(&pNewAgg, pAgg, pAggFuncList, pTargets); + if (TSDB_CODE_SUCCESS == code) { + code = splitCacheLastFuncOptModifyAggLogicNode(pAgg); + } + if (TSDB_CODE_SUCCESS == code) { + code = splitCacheLastFuncOptCreateMergeLogicNode(&pMerge, pNewAgg, pAgg); + } + if (TSDB_CODE_SUCCESS == code) { + code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pAgg, (SLogicNode*)pMerge); + } + + nodesDestroyNode((SNode *)pAgg); + nodesDestroyNode((SNode *)pNewAgg); + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode((SNode *)pMerge); + } + + pCxt->optimized = true; + return code; +} + + + // merge projects static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) { @@ -3762,6 +3978,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize}, {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}, {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}, + {.pName = "splitCacheLastFunc", .optimizeFunc = splitCacheLastFuncOptimize}, {.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}, {.pName = "TagScan", .optimizeFunc = tagScanOptimize}, {.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize}, diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0e80f5bcec..6780dcd681 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -2001,6 +2001,9 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildre SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc; code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pMergeLogicNode->node.pTargets, &pMerge->pTargets); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc); + } } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index bf5fe901a6..fd6706e43e 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1588,9 +1588,12 @@ typedef struct SSmaIndexSplitInfo { static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, SSmaIndexSplitInfo* pInfo) { if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { - pInfo->pMerge = (SMergeLogicNode*)pNode; - pInfo->pSubplan = pSubplan; - return true; + int32_t nodeType = nodeType(nodesListGetNode(pNode->pChildren, 0)); + if (nodeType == QUERY_NODE_LOGIC_PLAN_EXCHANGE || nodeType == QUERY_NODE_LOGIC_PLAN_MERGE) { + pInfo->pMerge = (SMergeLogicNode*)pNode; + pInfo->pSubplan = pSubplan; + return true; + } } return false; }