From 84ea04ea2a93db01ebea85e854f3dd201909a422 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 10:53:22 +0800 Subject: [PATCH] fix: change merge interval to merge aligned interval --- include/libs/nodes/nodes.h | 2 +- include/libs/nodes/plannodes.h | 2 +- source/libs/command/src/explain.c | 8 ++-- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 6 +-- source/libs/executor/src/timewindowoperator.c | 40 +++++++++---------- source/libs/nodes/src/nodesCloneFuncs.c | 2 +- source/libs/nodes/src/nodesCodeFuncs.c | 8 ++-- source/libs/nodes/src/nodesUtilFuncs.c | 6 +-- source/libs/planner/src/planPhysiCreater.c | 2 +- 10 files changed, 39 insertions(+), 39 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 58e2393970..61c9d5bfd0 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -226,7 +226,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_MERGE, QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index b07e8f39d5..28de4a3fad 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -375,7 +375,7 @@ typedef struct SIntervalPhysiNode { int8_t slidingUnit; } SIntervalPhysiNode; -typedef SIntervalPhysiNode SMergeIntervalPhysiNode; +typedef SIntervalPhysiNode SMergeAlignedIntervalPhysiNode; typedef SIntervalPhysiNode SStreamIntervalPhysiNode; typedef SIntervalPhysiNode SStreamFinalIntervalPhysiNode; typedef SIntervalPhysiNode SStreamSemiIntervalPhysiNode; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index ae0f669d65..2d55b06f06 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -184,8 +184,8 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo pPhysiChildren = indefPhysiNode->node.pChildren; break; } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: { - SMergeIntervalPhysiNode *intPhysiNode = (SMergeIntervalPhysiNode *)pNode; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: { + SMergeAlignedIntervalPhysiNode *intPhysiNode = (SMergeAlignedIntervalPhysiNode *)pNode; pPhysiChildren = intPhysiNode->window.node.pChildren; break; } @@ -841,8 +841,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } break; } - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: { - SMergeIntervalPhysiNode *pIntNode = (SMergeIntervalPhysiNode *)pNode; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: { + SMergeAlignedIntervalPhysiNode *pIntNode = (SMergeAlignedIntervalPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->window.pTspk)); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2a0f34cd03..1312eca81f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -729,7 +729,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream); -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 94675ce253..6c72e0ef02 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4190,8 +4190,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); - } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { - SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { + SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); @@ -4204,7 +4204,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); + pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 281b2e3173..ad133ce103 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3667,23 +3667,23 @@ _error: return NULL; } -typedef struct SMergeIntervalAggOperatorInfo { +typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo intervalAggOperatorInfo; bool hasGroupId; uint64_t groupId; SSDataBlock* prefetchedBlock; bool inputBlocksFinished; -} SMergeIntervalAggOperatorInfo; +} SMergeAlignedIntervalAggOperatorInfo; -void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { - SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; +void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) { + SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param; destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); } -static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, +static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, TSKEY wstartTs) { - SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -3702,9 +3702,9 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t return 0; } -static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, +static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) { - SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -3744,7 +3744,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); - outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); currTs = tsCols[currPos]; currWin.skey = currTs; @@ -3764,13 +3764,13 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); - outputMergeIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); + outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); } -static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { +static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info; + SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -3809,7 +3809,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); - doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; @@ -3828,10 +3828,10 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pRes; } -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo) { - SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); + SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (miaInfo == NULL || pOperator == NULL) { goto _error; @@ -3866,8 +3866,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI initResultRowInfo(&iaInfo->binfo.resultRowInfo); - pOperator->name = "TimeMergeIntervalAggOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; + pOperator->name = "TimeMergeAlignedIntervalAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->exprSupp.pExprInfo = pExprInfo; @@ -3875,8 +3875,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = miaInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, - destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeAlignedIntervalAgg, NULL, NULL, + destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -3886,7 +3886,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI return pOperator; _error: - destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); + destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols); taosMemoryFreeClear(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 8290b5628c..0bde0d0581 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -702,7 +702,7 @@ SNode* nodesCloneNode(const SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: return physiSysTableScanCopy((const SSystemTableScanPhysiNode*)pNode, (SSystemTableScanPhysiNode*)pDst); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index bde5159087..d31376cd13 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -234,8 +234,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiSort"; case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: return "PhysiHashInterval"; - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: - return "PhysiMergeInterval"; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + return "PhysiMergeAlignedInterval"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: return "PhysiStreamInterval"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: @@ -4125,7 +4125,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_SORT: return physiSortNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: @@ -4265,7 +4265,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { case QUERY_NODE_PHYSICAL_PLAN_SORT: return jsonToPhysiSortNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 9b0241f2f1..4721884391 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -287,8 +287,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SSortPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: return makeNode(type, sizeof(SIntervalPhysiNode)); - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: - return makeNode(type, sizeof(SMergeIntervalPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + return makeNode(type, sizeof(SMergeAlignedIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: return makeNode(type, sizeof(SStreamIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: @@ -825,7 +825,7 @@ void nodesDestroyNode(SNode* pNode) { break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ff78370c52..adc613e63a 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1038,7 +1038,7 @@ static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) { case INTERVAL_ALGO_HASH: return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; case INTERVAL_ALGO_MERGE: - return QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; + return QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; case INTERVAL_ALGO_STREAM_FINAL: return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL; case INTERVAL_ALGO_STREAM_SEMI: