diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index f48e4531de..e318529cf8 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -362,18 +362,32 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh return code; } -static void addNewTaskList(SStreamObj* pStream) { +static int32_t addNewTaskList(SStreamObj* pStream) { SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); + if (pTaskList == NULL) { + mError("failed init task list, code:%s", tstrerror(terrno)); + return terrno; + } + if (taosArrayPush(pStream->tasks, &pTaskList) == NULL) { - mError("failed to put into array"); + mError("failed to put into array, code:%s", tstrerror(terrno)); + return terrno; } if (pStream->conf.fillHistory) { pTaskList = taosArrayInit(0, POINTER_BYTES); + if (pTaskList == NULL) { + mError("failed init task list, code:%s", tstrerror(terrno)); + return terrno; + } + if (taosArrayPush(pStream->pHTasksList, &pTaskList) == NULL) { - mError("failed to put into array"); + mError("failed to put into array, code:%s", tstrerror(terrno)); + return terrno; } } + + return TSDB_CODE_SUCCESS; } // set the history task id @@ -454,10 +468,11 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) { static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) { void* pIter = NULL; - int32_t code = 0; SSdb* pSdb = pMnode->pSdb; - - addNewTaskList(pStream); + int32_t code = addNewTaskList(pStream); + if (code) { + return code; + } while (1) { SVgObj* pVgroup = NULL; @@ -570,8 +585,10 @@ END: } static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) { - int32_t code = 0; - addNewTaskList(pStream); + int32_t code = addNewTaskList(pStream); + if (code) { + return code; + } if (pStream->fixedSinkVgId == 0) { code = doAddShuffleSinkTask(pMnode, pStream, pEpset); @@ -676,8 +693,13 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan); + pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); + if (pStream->tasks == NULL || pStream->pHTasksList == NULL) { + mError("failed to create stream obj, code:%s", tstrerror(terrno)); + return terrno; + } if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { // add extra sink @@ -717,6 +739,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (terrno != 0) code = terrno; TAOS_RETURN(code); } + do { SArray** list = taosArrayGetLast(pStream->tasks); float size = (float)taosArrayGetSize(*list); @@ -724,7 +747,10 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (cnt <= 1) break; mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt); - addNewTaskList(pStream); + code = addNewTaskList(pStream); + if (code) { + return code; + } for (int j = 0; j < cnt; j++) { code = addAggTask(pStream, pMnode, plan, pEpset, false); @@ -750,7 +776,12 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* mDebug("doScheduleStream add final agg"); SArray** list = taosArrayGetLast(pStream->tasks); size_t size = taosArrayGetSize(*list); - addNewTaskList(pStream); + + code = addNewTaskList(pStream); + if (code) { + return code; + } + code = addAggTask(pStream, pMnode, plan, pEpset, true); if (code != TSDB_CODE_SUCCESS) { TAOS_RETURN(code);