diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 2a8040cc55..88d326a5c4 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -222,13 +222,12 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou int64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory); + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } - epsetAssign(&(pTask)->info.mnodeEpset, pEpset); mDebug("doAddSinkTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); pTask->info.nodeId = pVgroup->vgId; @@ -279,25 +278,60 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* return TDB_CODE_SUCCESS; } -static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, int64_t firstWindowSkey, +static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) { + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SVgroupVer* pVer = taosArrayGet(pList, i); + if (pVer->vgId == vgId) { + return pVer->ver; + } + } + + mError("failed to find the vgId:%d for extract last version", vgId); + return -1; +} + +static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) { + int64_t latestVer = getVgroupLastVer(pVerList, vgId); + if (latestVer < 0) { + latestVer = 0; + } + + // set the correct ts, which is the last key of queried table. + SDataRange* pRange = &pTask->dataRange; + STimeWindow* pWindow = &pRange->window; + + if (pTask->info.fillHistory) { + pWindow->skey = INT64_MIN; + pWindow->ekey = skey - 1; + + pRange->range.minVer = 0; + pRange->range.maxVer = latestVer; + mDebug("add fill-history source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, + pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer); + } else { + pWindow->skey = skey; + pWindow->ekey = INT64_MAX; + + pRange->range.minVer = latestVer + 1; + pRange->range.maxVer = INT64_MAX; + + mDebug("add source task 0x%x timeWindow:%" PRId64 "-%" PRId64 " verRange:%" PRId64 "-%" PRId64, + pTask->id.taskId, pWindow->skey, pWindow->ekey, pRange->range.minVer, pRange->range.maxVer); + } +} + +static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) { uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, + SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory); if (pTask == NULL) { return NULL; } - epsetAssign(&pTask->info.mnodeEpset, pEpset); - STimeWindow* pWindow = &pTask->dataRange.window; - - pWindow->skey = INT64_MIN; - pWindow->ekey = firstWindowSkey - 1; - mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey); - return pTask; } @@ -331,15 +365,17 @@ static void setHTasksId(SStreamObj* pStream) { } static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, - int64_t nextWindowSkey, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){ + int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){ // new stream task - SStreamTask* pTask = buildSourceTask(pStream, pEpset, nextWindowSkey, isFillhistory, useTriggerParam); + SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam); if(pTask == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); + streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); + int32_t code = mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup); if(code != 0){ terrno = code; @@ -380,7 +416,7 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){ } static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, - SEpSet* pEpset, int64_t nextWindowSkey, bool useTriggerParam) { + SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) { addNewTaskList(pStream); void* pIter = NULL; @@ -397,14 +433,14 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream continue; } - int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, false, useTriggerParam); + int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam); if(code != 0){ sdbRelease(pSdb, pVgroup); return code; } if (pStream->conf.fillHistory) { - code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, true, useTriggerParam); + code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, true, useTriggerParam); if(code != 0){ sdbRelease(pSdb, pVgroup); return code; @@ -425,7 +461,7 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, isFillhistory, + SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory); if (pAggTask == NULL) { @@ -433,7 +469,6 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil return NULL; } - epsetAssign(&pAggTask->info.mnodeEpset, pEpset); return pAggTask; } @@ -566,7 +601,8 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { mDebug("bindTwoLevel task list(%d-%d) to taskId:%s", begin, end - 1, (*(pDownTask))->id.idStr); } -static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey, SEpSet* pEpset) { +static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey, + SArray* pVerList) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); bool hasExtraSink = false; @@ -600,7 +636,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (plan == NULL) { return terrno; } - int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, numOfPlanLevel == 1); + int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1); if (code != TSDB_CODE_SUCCESS) { return code; }