diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 3de7a55820..ac01890b9b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -86,6 +86,8 @@ END: int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { STaskOutputInfo* pInfo = &pTask->outputInfo; + mDebug("mndSetSinkTaskInfo to sma or table, taskId:%s", pTask->id.idStr); + if (pStream->smaId != 0) { pInfo->type = TASK_OUTPUT__SMA; pInfo->smaSink.smaId = pStream->smaId; @@ -196,7 +198,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, +static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) { int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); @@ -208,12 +210,27 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, } epsetAssign(&(pTask)->info.mnodeEpset, pEpset); + mDebug("doAddSinkTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); - pTask->info.nodeId = vgId; + pTask->info.nodeId = pVgroup->vgId; pTask->info.epSet = mndGetVgroupEpset(pMnode, pVgroup); return mndSetSinkTaskInfo(pStream, pTask); } +static int32_t doAddSinkTaskToVg(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset, SVgObj* vgObj){ + int32_t code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, false); + if (code != 0) { + return code; + } + if(pStream->conf.fillHistory){ + code = doAddSinkTask(pStream, pMnode, vgObj, pEpset, true); + if (code != 0) { + return code; + } + } + return TDB_CODE_SUCCESS; +} + // create sink node for each vgroup. static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset) { SSdb* pSdb = pMnode->pSdb; @@ -231,18 +248,12 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* continue; } - int32_t code = doAddSinkTask(pStream, pMnode, pVgroup->vgId, pVgroup, pEpset, false); + int32_t code = doAddSinkTaskToVg(pMnode, pStream, pEpset, pVgroup); if(code != 0){ sdbRelease(pSdb, pVgroup); return code; } - if(pStream->conf.fillHistory){ - code = doAddSinkTask(pStream, pMnode, pVgroup->vgId, pVgroup, pEpset, true); - if(code != 0){ - sdbRelease(pSdb, pVgroup); - return code; - } - } + sdbRelease(pSdb, pVgroup); } @@ -271,14 +282,20 @@ static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, return pTask; } -static SArray* addNewTaskList(SArray* pTasksList) { +static void addNewTaskList(SStreamObj* pStream){ SArray* pTaskList = taosArrayInit(0, POINTER_BYTES); - taosArrayPush(pTasksList, &pTaskList); - return pTaskList; + taosArrayPush(pStream->tasks, &pTaskList); + if (pStream->conf.fillHistory) { + pTaskList = taosArrayInit(0, POINTER_BYTES); + taosArrayPush(pStream->pHTasksList, &pTaskList); + } } // set the history task id -static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { +static void setHTasksId(SStreamObj* pStream) { + SArray* pTaskList = *(SArray**)taosArrayGetLast(pStream->tasks); + SArray* pHTaskList = *(SArray**)taosArrayGetLast(pStream->pHTasksList); + for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { SStreamTask** pStreamTask = taosArrayGet(pTaskList, i); SStreamTask** pHTask = taosArrayGet(pHTaskList, i); @@ -303,6 +320,8 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } + mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); + int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); if(code != 0){ terrno = code; @@ -345,12 +364,6 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t nextWindowSkey) { // create exec stream task, since only one level, the exec task is also the source task - SArray* pTaskList = addNewTaskList(pStream->tasks); - SArray* pHTaskList = NULL; - if (pStream->conf.fillHistory) { - pHTaskList = addNewTaskList(pStream->pHTasksList); - } - void* pIter = NULL; SSdb* pSdb = pMnode->pSdb; while (1) { @@ -383,7 +396,7 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream } if (pStream->conf.fillHistory) { - setHTasksId(pTaskList, pHTaskList); + setHTasksId(pStream); } return TSDB_CODE_SUCCESS; @@ -403,12 +416,24 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil return pAggTask; } -static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset){ - SStreamTask* pTask = buildAggTask(pStream, pEpset, false); +static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory){ + int32_t code = 0; + SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory); if (pTask == NULL) { return terrno; } + if (pSnode != NULL) { + code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode); + mDebug("doAddAggTask taskId:%s, snode id:%d, isFillHistory:%d", pTask->id.idStr, pSnode->id, isFillhistory); + } else { + code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); + mDebug("doAddAggTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); + } + return code; +} + +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset){ SVgObj* pVgroup = NULL; SSnodeObj* pSnode = NULL; int32_t code = 0; @@ -421,34 +446,18 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); } - if (pSnode != NULL) { - code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode); - } else { - code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); - } + code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false); if(code != 0){ goto END; } if (pStream->conf.fillHistory) { - pTask = buildAggTask(pStream, pEpset, true); - if (pTask == NULL) { - code = terrno; - goto END; - } - - if (pSnode != NULL) { - code = mndAssignStreamTaskToSnode(pMnode, pTask, plan, pSnode); - } else { - code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); - } + code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true); if(code != 0){ goto END; } - SArray** pAggTaskList = taosArrayGetLast(pStream->tasks); - SArray** pHAggTaskList = taosArrayGetLast(pStream->pHTasksList); - setHTasksId(*pAggTaskList, *pHAggTaskList); + setHTasksId(pStream); } END: @@ -460,43 +469,24 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, return code; } -static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset) { - addNewTaskList(pStream->tasks); - if (pStream->conf.fillHistory) { - addNewTaskList(pStream->pHTasksList); - } - return doAddAggTask(pStream, pMnode, plan, pEpset); -} - static int32_t addSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* pEpset){ - SArray* pSinkTaskList = addNewTaskList(pStream->tasks); - - SArray* pHSinkTaskList = NULL; - if (pStream->conf.fillHistory) { - pHSinkTaskList = addNewTaskList(pStream->pHTasksList); - } - int32_t code = 0; + addNewTaskList(pStream); + if (pStream->fixedSinkVgId == 0) { code = doAddShuffleSinkTask(pMnode, pStream, pEpset); if (code != 0) { return code; } } else { - code = doAddSinkTask(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset, false); + code = doAddSinkTaskToVg(pMnode, pStream, pEpset, &pStream->fixedSinkVg); if (code != 0) { return code; } - if(pStream->conf.fillHistory){ - code = doAddSinkTask(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, pEpset, true); - if (code != 0) { - return code; - } - } } if (pStream->conf.fillHistory) { - setHTasksId(pSinkTaskList, pHSinkTaskList); + setHTasksId(pStream); } return TDB_CODE_SUCCESS; } @@ -507,6 +497,7 @@ static void bindTaskToSinkTask(SStreamObj* pStream, SMnode* pMnode, SArray* pSin SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, k); streamTaskSetUpstreamInfo(pSinkTask, task); } + mDebug("bindTaskToSinkTask taskId:%s to sink task list", task->id.idStr); } static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) { @@ -516,6 +507,7 @@ static void bindAggSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks) { for(int i = 0; i < taosArrayGetSize(*pAggTaskList); i++){ SStreamTask* pAggTask = taosArrayGetP(*pAggTaskList, i); bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pAggTask); + mDebug("bindAggSink taskId:%s to sink task list", pAggTask->id.idStr); } } @@ -525,6 +517,8 @@ static void bindSourceSink(SStreamObj* pStream, SMnode* pMnode, SArray* tasks, b for(int i = 0; i < taosArrayGetSize(pSourceTaskList); i++){ SStreamTask* pSourceTask = taosArrayGetP(pSourceTaskList, i); + mDebug("bindSourceSink taskId:%s to sink task list", pSourceTask->id.idStr); + if (hasExtraSink) { bindTaskToSinkTask(pStream, pMnode, pSinkTaskList, pSourceTask); } else { @@ -548,6 +542,7 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); streamTaskSetUpstreamInfo(*pDownTask, pUpTask); } + mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end, (*(pDownTask))->id.idStr); } static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { @@ -564,6 +559,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* bool multiTarget = (pDbObj->cfg.numOfVgroups > 1); sdbRelease(pSdb, pDbObj); + mDebug("doScheduleStream numOfPlanLevel:%d, exDb:%d, multiTarget:%d, fix vgId:%d, physicalPlan:%s", + numOfPlanLevel, externalTargetDB, multiTarget, pStream->fixedSinkVgId, pStream->physicalPlan); pStream->tasks = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); pStream->pHTasksList = taosArrayInit(numOfPlanLevel + 1, POINTER_BYTES); @@ -606,13 +603,11 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* size_t cnt = (int)(size/tsStreamAggCnt + 0.5); if(cnt <= 1) break; - addNewTaskList(pStream->tasks); - if (pStream->conf.fillHistory) { - addNewTaskList(pStream->pHTasksList); - } + mDebug("doScheduleStream add middle agg, size:%d, cnt:%d", (int)size, (int)cnt); + addNewTaskList(pStream); for(int j = 0; j < cnt; j++){ - code = doAddAggTask(pStream, pMnode, plan, pEpset); + code = addAggTask(pStream, pMnode, plan, pEpset); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -630,8 +625,10 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return terrno; } + mDebug("doScheduleStream add final agg"); SArray** list = taosArrayGetLast(pStream->tasks); size_t size = taosArrayGetSize(*list); + addNewTaskList(pStream); code = addAggTask(pStream, pMnode, plan, pEpset); if (code != TSDB_CODE_SUCCESS) { return code;