From 2a506b3e5e7b96bba67ff22324729f8b0ff6c815 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 8 Dec 2023 18:48:34 +0800 Subject: [PATCH] feat:add multi level agg operator for stream task --- source/dnode/mnode/impl/src/mndScheduler.c | 180 ++++++++++++--------- 1 file changed, 102 insertions(+), 78 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index fef18574e6..8213a21d7f 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -24,6 +24,9 @@ #include "tname.h" #include "tuuid.h" +#define SINK_NODE_LEVEL (0) +#define SOURCE_NODE_LEVEL (0) +#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; @@ -311,8 +314,9 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre return TDB_CODE_SUCCESS; } -static SSubplan* getScanSubPlan(const SQueryPlan* pPlan, int planIndex){ - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, planIndex); +static SSubplan* getScanSubPlan(const SQueryPlan* pPlan){ + int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); + SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, numOfPlanLevel - 1); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return NULL; @@ -326,8 +330,8 @@ static SSubplan* getScanSubPlan(const SQueryPlan* pPlan, int planIndex){ return plan; } -static SSubplan* getFinalAggSubPlan(const SQueryPlan* pPlan){ - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); +static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){ + SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, index); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return NULL; @@ -402,9 +406,7 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil return pAggTask; } -static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset) { - SArray* pAggTaskList = addNewTaskList(pStream->tasks); - +static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset){ SStreamTask* pTask = buildAggTask(pStream, pEpset, false); if (pTask == NULL) { return terrno; @@ -428,15 +430,13 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); } if(code != 0){ - terrno = code; goto END; } if (pStream->conf.fillHistory) { - SArray* pHAggTaskList = addNewTaskList(pStream->pHTasksList); - pTask = buildAggTask(pStream, pEpset, true); if (pTask == NULL) { + code = terrno; goto END; } @@ -446,21 +446,29 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); } if(code != 0){ - terrno = code; goto END; } - setHTasksId(pAggTaskList, pHAggTaskList); + SArray** pAggTaskList = taosArrayGetLast(pStream->tasks); + SArray** pHAggTaskList = taosArrayGetLast(pStream->pHTasksList); + setHTasksId(*pAggTaskList, *pHAggTaskList); } -END: + END: if (pSnode != NULL) { sdbRelease(pMnode->pSdb, pSnode); } else { sdbRelease(pMnode->pSdb, pVgroup); } + return code; +} - return terrno; +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset) { + addNewTaskList(pStream->tasks); + if (pStream->conf.fillHistory) { + addNewTaskList(pStream->pHTasksList); + } + return doAddAggTask(pStream, pMnode, plan, pEpset); } static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){ @@ -498,7 +506,7 @@ static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){ mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task); - for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); ++k) { + for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); k++) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); streamTaskSetUpstreamInfo(pSinkTask, task); } @@ -506,17 +514,17 @@ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSin static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) { SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); - SArray* pAggTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); + SArray** pAggTaskList = taosArrayGetLast(tasks); - for(int i = 0; i < taosArrayGetSize(pAggTaskList); i++){ - SStreamTask* pAggTask = taosArrayGetP(pAggTaskList, i); + for(int i = 0; i < taosArrayGetSize(*pAggTaskList); i++){ + SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i); bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask); } } static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) { SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); - SArray* pSourceTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); + SArray* pSourceTaskList = taosArrayGetP(tasks, hasExtraSink ? SINK_NODE_LEVEL + 1 : SINK_NODE_LEVEL); for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); @@ -528,23 +536,27 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b } } -static void bindSourceAgg(SArray* tasks) { - SArray* pAggTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); - SArray* pSourceTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 2); +static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { + size_t size = taosArrayGetSize(tasks); + ASSERT(size >= 2); + SArray* pDownTaskList = taosArrayGetP(tasks, size - 1); + SArray* pUpTaskList = taosArrayGetP(tasks, size - 2); - SStreamTask* pAggTask = taosArrayGetP(pAggTaskList, 0); - for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ - SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); - streamTaskSetFixedDownstreamInfo(pSourceTask, pAggTask); - streamTaskSetUpstreamInfo(pAggTask, pSourceTask); + SStreamTask** pDownTask = taosArrayGetLast(pDownTaskList); + for(int i = begin; i < end; i++){ + SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); + if(pUpTask == NULL) { // out of range + break; + } + streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); + streamTaskSetUpstreamInfo(*pDownTask, pUpTask); } } -//#define AGGNUM 2 +#define AGGNUM 2 static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); - bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); @@ -570,62 +582,74 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pStream->totalLevel = numOfPlanLevel + hasExtraSink; -// if(numOfPlanLevel == 3){ -// pDbObj = mndAcquireDb(pMnode, pStream->sourceDb); -// if (pDbObj == NULL) { -// terrno = TSDB_CODE_QRY_INVALID_INPUT; -// return -1; -// } -// -// if(pDbObj->cfg.numOfVgroups >= AGGNUM){ -// -// } -// sdbRelease(pSdb, pDbObj); -// }else - if (numOfPlanLevel > 1) { - SSubplan *plan = getFinalAggSubPlan(pPlan); - if(plan == NULL){ - return terrno; - } - int32_t code = addAggTask(pStream, pMnode, plan, pEpset); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - bindAggSink(pStream, pMnode, pStream->tasks); - if (pStream->conf.fillHistory) { - bindAggSink(pStream, pMnode, pStream->pHTasksList); - } - - plan = getScanSubPlan(pPlan, numOfPlanLevel - 1); - if(plan == NULL){ - return terrno; - } - code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - bindSourceAgg(pStream->tasks); - if (pStream->conf.fillHistory) { - bindSourceAgg(pStream->pHTasksList); - } - } else if (numOfPlanLevel == 1) { - SSubplan *plan = getScanSubPlan(pPlan, 0); - if(plan == NULL){ - return terrno; - } - int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + SSubplan* plan = getScanSubPlan(pPlan); // source plan + if (plan == NULL) { + return terrno; + } + int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (numOfPlanLevel == 1) { bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink); if (pStream->conf.fillHistory) { bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink); } + return TDB_CODE_SUCCESS; } - return 0; + if(numOfPlanLevel == 3){ + plan = getAggSubPlan(pPlan, 1); // middle agg plan + if (plan == NULL) { + return terrno; + } + do{ + SArray** list = taosArrayGetLast(pStream->tasks); + float size = (float)taosArrayGetSize(*list); + size_t cnt = (int)(size/AGGNUM + 0.5); + if(cnt <= 1) break; + + addNewTaskList(pStream->tasks); + if (pStream->conf.fillHistory) { + addNewTaskList(pStream->pHTasksList); + } + + for(int j = 0; j < cnt; j++){ + code = doAddAggTask(pStream, pMnode, plan, pEpset); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + bindTwoLevel(pStream->tasks, j*AGGNUM, (j+1)*AGGNUM); + if (pStream->conf.fillHistory) { + bindTwoLevel(pStream->pHTasksList, j*AGGNUM, (j+1)*AGGNUM); + } + } + }while(1); + } + + plan = getAggSubPlan(pPlan, 0); + if (plan == NULL) { + return terrno; + } + + SArray** list = taosArrayGetLast(pStream->tasks); + size_t size = taosArrayGetSize(*list); + code = addAggTask(pStream, pMnode, plan, pEpset); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + bindTwoLevel(pStream->tasks, 0, size); + if (pStream->conf.fillHistory) { + bindTwoLevel(pStream->pHTasksList, 0, size); + } + + bindAggSink(pStream, pMnode, pStream->tasks); + if (pStream->conf.fillHistory) { + bindAggSink(pStream, pMnode, pStream->pHTasksList); + } + return TDB_CODE_SUCCESS; } int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey) {