From b36665c9d619b237d78f62e0294a8aea3ad4108e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 8 Dec 2023 15:48:44 +0800 Subject: [PATCH] opti:build task logic in stream --- source/dnode/mnode/impl/src/mndScheduler.c | 481 ++++++++++----------- 1 file changed, 232 insertions(+), 249 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3ef4c9a4d2..80bf084f1a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -27,9 +27,6 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, - SEpSet* pEpset, bool isFillhistory); - int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { SNode* pAst = NULL; @@ -157,12 +154,7 @@ int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan plan->execNode.nodeId = pTask->info.nodeId; plan->execNode.epSet = pTask->info.epSet; - if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - - return 0; + return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen); } SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { @@ -184,11 +176,7 @@ int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan->execNode.epSet = pTask->info.epSet; mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE); - if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - return 0; + return qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen); } // todo random choose a node to do compute @@ -208,8 +196,26 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } +static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, + SEpSet* pEpset, bool isFillhistory) { + int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; + SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); + + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory); + if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + epsetAssign(&(pTask)->info.mnodeEpset, pEpset); + + pTask->info.nodeId = vgId; + pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); + return mndSetSinkTaskInfo(pStream, pTask); +} + // create sink node for each vgroup. -int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, SEpSet* pEpset, bool fillHistory) { +static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; @@ -225,37 +231,34 @@ int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStr continue; } - doAddSinkTask(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, pEpset, fillHistory); + int32_t code = doAddSinkTask(pStream, pMnode, pVgroup->vgId, pVgroup, pEpset, false); + if(code != 0){ + sdbRelease(pSdb, pVgroup); + return code; + } + if(pStream->conf.fillHistory){ + code = doAddSinkTask(pStream, pMnode, pVgroup->vgId, pVgroup, pEpset, true); + if(code != 0){ + sdbRelease(pSdb, pVgroup); + return code; + } + } sdbRelease(pSdb, pVgroup); } - return 0; + return TDB_CODE_SUCCESS; } -int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, - SEpSet* pEpset, bool isFillhistory) { - int64_t uid = (isFillhistory)? pStream->hTaskUid:pStream->uid; - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, pTaskList, pStream->conf.fillHistory); +static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, + int64_t firstWindowSkey, bool isFillhistory) { + uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; + SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); + + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, + isFillhistory, pStream->conf.triggerParam, + *pTaskList, pStream->conf.fillHistory); if (pTask == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - epsetAssign(&(pTask)->info.mnodeEpset, pEpset); - - pTask->info.nodeId = vgId; - pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); - mndSetSinkTaskInfo(pStream, pTask); - return 0; -} - -static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, - SStreamObj* pStream, SSubplan* plan, uint64_t uid, SEpSet* pEpset, bool fillHistory, - bool hasExtraSink, int64_t firstWindowSkey, bool hasFillHistory) { - SStreamTask* pTask = - tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillHistory); - if (pTask == NULL) { - return terrno; + return NULL; } epsetAssign(&pTask->info.mnodeEpset, pEpset); @@ -265,23 +268,7 @@ static int32_t addSourceTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, pWindow->ekey = firstWindowSkey - 1; mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey); - // sink or dispatch - if (hasExtraSink) { - mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, pTask); - } else { - mndSetSinkTaskInfo(pStream, pTask); - } - - if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { - return terrno; - } - - for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { - SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - streamTaskSetUpstreamInfo(pSinkTask, pTask); - } - - return TSDB_CODE_SUCCESS; + return pTask; } static SArray* addNewTaskList(SArray* pTasksList) { @@ -307,30 +294,64 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { } } -static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, - SEpSet* pEpset, bool hasExtraSink, int64_t nextWindowSkey) { - // create exec stream task, since only one level, the exec task is also the source task - SArray* pTaskList = addNewTaskList(pStream->tasks); - SSdb* pSdb = pMnode->pSdb; - - SArray* pHTaskList = NULL; - if (pStream->conf.fillHistory) { - pHTaskList = addNewTaskList(pStream->pHTasksList); +static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, + SEpSet* pEpset, int64_t nextWindowSkey, + SVgObj* pVgroup, bool isFillhistory ){ + // new stream task + SStreamTask* pTask = buildSourceTask(pStream, pEpset, nextWindowSkey, isFillhistory); + if(pTask == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; } + int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); + if(code != 0){ + terrno = code; + return terrno; + } + return TDB_CODE_SUCCESS; +} - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); +static SSubplan* getScanSubPlan(const SQueryPlan* pPlan, int planIndex){ + SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, planIndex); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + return NULL; } SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); if (plan->subplanType != SUBPLAN_TYPE_SCAN) { terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + return NULL; + } + return plan; +} + +static SSubplan* getFinalAggSubPlan(const SQueryPlan* pPlan){ + SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); + if (LIST_LENGTH(inner->pNodeList) != 1) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return NULL; + } + + SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); + if (plan->subplanType != SUBPLAN_TYPE_MERGE) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return NULL; + } + return plan; +} + +static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, + SEpSet* pEpset, int64_t nextWindowSkey) { + // create exec stream task, since only one level, the exec task is also the source task + SArray* pTaskList = addNewTaskList(pStream->tasks); + SArray* pHTaskList = NULL; + if (pStream->conf.fillHistory) { + pHTaskList = addNewTaskList(pStream->pHTasksList); } void* pIter = NULL; + SSdb* pSdb = pMnode->pSdb; while (1) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -343,25 +364,21 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* continue; } - // new stream task - SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = addSourceTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, pEpset, - false, hasExtraSink, nextWindowSkey, pStream->conf.fillHistory); - if (code != TSDB_CODE_SUCCESS) { + int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, false); + if(code != 0){ sdbRelease(pSdb, pVgroup); - return -1; + return code; } if (pStream->conf.fillHistory) { - SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); - code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, - pEpset, true, hasExtraSink, nextWindowSkey, true); + code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, true); + if(code != 0){ + sdbRelease(pSdb, pVgroup); + return code; + } } sdbRelease(pSdb, pVgroup); - if (code != TSDB_CODE_SUCCESS) { - return -1; - } } if (pStream->conf.fillHistory) { @@ -371,76 +388,31 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* return TSDB_CODE_SUCCESS; } -static int32_t doAddSourceTask(SArray* pTaskList, bool isFillhistory, int64_t uid, SStreamTask* pDownstreamTask, - SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, SEpSet* pEpset, - int64_t nextWindowSkey, bool hasFillHistory) { - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, isFillhistory, 0, pTaskList, hasFillHistory); - if (pTask == NULL) { +static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory) { + uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; + SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); + + SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, isFillhistory, pStream->conf.triggerParam, *pTaskList, pStream->conf.fillHistory); + if (pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + return NULL; } - epsetAssign(&(pTask)->info.mnodeEpset, pEpset); - - // todo set the correct ts, which should be last key of queried table. - STimeWindow* pWindow = &pTask->dataRange.window; - pWindow->skey = INT64_MIN; - pWindow->ekey = nextWindowSkey - 1; - - mDebug("s-task:0x%x level:%d set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->info.taskLevel, - pWindow->skey, pWindow->ekey); - - // all the source tasks dispatch result to a single agg node. - streamTaskSetFixedDownstreamInfo(pTask, pDownstreamTask); - if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) { - return -1; - } - - return streamTaskSetUpstreamInfo(pDownstreamTask, pTask); + epsetAssign(&pAggTask->info.mnodeEpset, pEpset); + return pAggTask; } -static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, - SEpSet* pEpset, bool fillHistory, SStreamTask** pAggTask, bool hasFillhistory) { - *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList, hasFillhistory); - if (*pAggTask == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - epsetAssign(&(*pAggTask)->info.mnodeEpset, pEpset); - - // dispatch - if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) { - return -1; - } - - return 0; -} - -static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, - SStreamTask** pAggTask, SStreamTask** pHAggTask) { +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset) { SArray* pAggTaskList = addNewTaskList(pStream->tasks); - SSdb* pSdb = pMnode->pSdb; - SNodeListNode* pInnerNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); - SSubplan* plan = (SSubplan*)nodesListGetNode(pInnerNode->pNodeList, 0); - if (plan->subplanType != SUBPLAN_TYPE_MERGE) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - - *pAggTask = NULL; - SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - - int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, pEpset, false, pAggTask, - pStream->conf.fillHistory); - if (code != TSDB_CODE_SUCCESS) { - return -1; + SStreamTask* pTask = buildAggTask(pStream, pEpset, false); + if (pTask == NULL) { + return terrno; } SVgObj* pVgroup = NULL; SSnodeObj* pSnode = NULL; - + int32_t code = 0; if (tsDeployOnSnode) { pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { @@ -451,135 +423,124 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan } if (pSnode != NULL) { - code = mndAssignStreamTaskToSnode(pMnode, *pAggTask, plan, pSnode); + code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode); } else { - code = mndAssignStreamTaskToVgroup(pMnode, *pAggTask, plan, pVgroup); + code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); + } + if(code != 0){ + terrno = code; + goto END; } if (pStream->conf.fillHistory) { SArray* pHAggTaskList = addNewTaskList(pStream->pHTasksList); - SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL); - *pHAggTask = NULL; - code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pEpset, pStream->conf.fillHistory, - pHAggTask, pStream->conf.fillHistory); - if (code != TSDB_CODE_SUCCESS) { - if (pSnode != NULL) { - sdbRelease(pSdb, pSnode); - } else { - sdbRelease(pSdb, pVgroup); - } - return code; + pTask = buildAggTask(pStream, pEpset, true); + if (pTask == NULL) { + goto END; } if (pSnode != NULL) { - code = mndAssignStreamTaskToSnode(pMnode, *pHAggTask, plan, pSnode); + code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode); } else { - code = mndAssignStreamTaskToVgroup(pMnode, *pHAggTask, plan, pVgroup); + code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); + } + if(code != 0){ + terrno = code; + goto END; } setHTasksId(pAggTaskList, pHAggTaskList); } +END: if (pSnode != NULL) { - sdbRelease(pSdb, pSnode); + sdbRelease(pMnode->pSdb, pSnode); } else { - sdbRelease(pSdb, pVgroup); + sdbRelease(pMnode->pSdb, pVgroup); } - return code; + return terrno; } -static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream, - SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, - SEpSet* pEpset, int64_t nextWindowSkey) { - SArray* pSourceTaskList = addNewTaskList(pStream->tasks); +static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){ + SArray* pSinkTaskList = addNewTaskList(pStream->tasks); - SArray* pHSourceTaskList = NULL; + SArray* pHSinkTaskList = NULL; if (pStream->conf.fillHistory) { - pHSourceTaskList = addNewTaskList(pStream->pHTasksList); + pHSinkTaskList = addNewTaskList(pStream->pHTasksList); } - SSdb* pSdb = pMnode->pSdb; - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1); - SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); - if (plan->subplanType != SUBPLAN_TYPE_SCAN) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; - } - - void* pIter = NULL; - while (1) { - SVgObj* pVgroup; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); - if (pIter == NULL) { - break; + int32_t code = 0; + if (pStream->fixedSinkVgId == 0) { + code = doAddShuffleSinkTask(pMnode, pStream, pEpset); + if (code != 0) { + return code; } - - if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) { - sdbRelease(pSdb, pVgroup); - continue; + } else { + code = doAddSinkTask(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset, false); + if (code != 0) { + return code; } - - int32_t code = doAddSourceTask(pSourceTaskList, false, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, pEpset, - nextWindowSkey, pStream->conf.fillHistory); - if (code != TSDB_CODE_SUCCESS) { - sdbRelease(pSdb, pVgroup); - terrno = code; - return -1; - } - - if (pStream->conf.fillHistory) { - code = doAddSourceTask(pHSourceTaskList, true, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup, pEpset, - nextWindowSkey, pStream->conf.fillHistory); - if (code != TSDB_CODE_SUCCESS) { - sdbRelease(pSdb, pVgroup); + if(pStream->conf.fillHistory){ + code = doAddSinkTask(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset, true); + if (code != 0) { return code; } } - - sdbRelease(pSdb, pVgroup); } if (pStream->conf.fillHistory) { - setHTasksId(pSourceTaskList, pHSourceTaskList); + setHTasksId(pSinkTaskList, pHSinkTaskList); } - - return TSDB_CODE_SUCCESS; + return TDB_CODE_SUCCESS; } -static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, - SEpSet* pEpset, bool fillHistory) { - SArray* pSinkTaskList = addNewTaskList(pTasksList); - if (pStream->fixedSinkVgId == 0) { - if (doAddShuffleSinkTask(pMnode, pSinkTaskList, pStream, pEpset, fillHistory) < 0) { - // TODO free - return -1; - } - } else { - if (doAddSinkTask(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset, - fillHistory) < 0) { - // TODO free - return -1; +static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSinkTaskList, SStreamTask* task){ + mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, task); + for(int32_t k = 0; k < taosArrayGetSize(pSinkTaskList); ++k) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); + streamTaskSetUpstreamInfo(pSinkTask, task); + } +} + +static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) { + SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); + SArray* pAggTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); + + for(int i = 0; i < taosArrayGetSize(pAggTaskList); i++){ + SStreamTask* pAggTask = taosArrayGetP(pAggTaskList, i); + bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask); + } +} + +static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, bool hasExtraSink) { + SArray* pSinkTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL); + SArray* pSourceTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); + + for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ + SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); + if (hasExtraSink) { + bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask); + } else { + mndSetSinkTaskInfo(pStream, pSourceTask); } } - - *pCreatedTaskList = pSinkTaskList; - return TSDB_CODE_SUCCESS; } -static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) { - if (taosArrayGetSize(pTasksList) < SINK_NODE_LEVEL || pUpstreamTask == NULL) { - return; - } +static void bindSourceAgg(SArray* tasks) { + SArray* pAggTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 1); + SArray* pSourceTaskList = taosArrayGetP(tasks, SINK_NODE_LEVEL + 2); - SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); - for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { - SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - streamTaskSetUpstreamInfo(pSinkTask, pUpstreamTask); + SStreamTask* pAggTask = taosArrayGetP(pAggTaskList, 0); + for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ + SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); + streamTaskSetFixedDownstreamInfo(pSourceTask, pAggTask); + streamTaskSetUpstreamInfo(pAggTask, pSourceTask); } } +//#define AGGNUM 2 static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -601,45 +562,67 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (numOfPlanLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { // add extra sink hasExtraSink = true; - - SArray* pSinkTaskList = NULL; - int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, pEpset, 0); + int32_t code = addSinkTask(pMnode, pStream, pEpset); if (code != TSDB_CODE_SUCCESS) { return code; } - - // check for fill history - if (pStream->conf.fillHistory) { - SArray* pHSinkTaskList = NULL; - code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pEpset, 1); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - setHTasksId(pSinkTaskList, pHSinkTaskList); - } } pStream->totalLevel = numOfPlanLevel + hasExtraSink; +// if(numOfPlanLevel == 3){ +// pDbObj = mndAcquireDb(pMnode, pStream->sourceDb); +// if (pDbObj == NULL) { +// terrno = TSDB_CODE_QRY_INVALID_INPUT; +// return -1; +// } +// +// if(pDbObj->cfg.numOfVgroups >= AGGNUM){ +// +// } +// sdbRelease(pSdb, pDbObj); +// }else if (numOfPlanLevel > 1) { - SStreamTask* pAggTask = NULL; - SStreamTask* pHAggTask = NULL; + SSubplan *plan = getFinalAggSubPlan(pPlan); + if(plan == NULL){ + return terrno; + } + int32_t code = addAggTask(pStream, pMnode, plan, pEpset); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + bindAggSink(pStream, pMnode, pStream->tasks); + if (pStream->conf.fillHistory) { + bindAggSink(pStream, pMnode, pStream->pHTasksList); + } - int32_t code = addAggTask(pStream, pMnode, pPlan, pEpset, &pAggTask, &pHAggTask); + plan = getScanSubPlan(pPlan, 1); + if(plan == NULL){ + return terrno; + } + code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey); if (code != TSDB_CODE_SUCCESS) { return code; } - setSinkTaskUpstreamInfo(pStream->tasks, pAggTask); - if (pHAggTask != NULL) { - setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); + bindSourceAgg(pStream->tasks); + if (pStream->conf.fillHistory) { + bindSourceAgg(pStream->pHTasksList); + } + } else if (numOfPlanLevel == 1) { + SSubplan *plan = getScanSubPlan(pPlan, 0); + if(plan == NULL){ + return terrno; + } + int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey); + if (code != TSDB_CODE_SUCCESS) { + return code; } - // source level - return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, pEpset, nextWindowSkey); - } else if (numOfPlanLevel == 1) { - return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, pEpset, hasExtraSink, nextWindowSkey); + bindSourceSink(pStream, pMnode, pStream->tasks, hasExtraSink); + if (pStream->conf.fillHistory) { + bindSourceSink(pStream, pMnode, pStream->pHTasksList, hasExtraSink); + } } return 0;