diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index cbc0ace75d..ef9a7205e1 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -44,9 +44,8 @@ static bool hasCountWindowNode(SPhysiNode* pNode) { } } -static bool countWindowStreamTask(SSubplan* pPlan) { - SPhysiNode* pNode = pPlan->pNode; - return hasCountWindowNode(pNode); +static bool isCountWindowStreamTask(SSubplan* pPlan) { + return hasCountWindowNode((SPhysiNode*)pPlan->pNode); } int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, @@ -342,13 +341,13 @@ static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVe } } -static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan) { - bool hasCountWindowNode = countWindowStreamTask(pPlan); - bool isRelStreamTask = (pTask->hTaskInfo.id.taskId != 0); - if (hasCountWindowNode && isRelStreamTask) { +static void haltInitialTaskStatus(SStreamTask* pTask, SSubplan* pPlan, bool isFillhistoryTask) { + bool hasCountWindowNode = isCountWindowStreamTask(pPlan); + + if (hasCountWindowNode && (!isFillhistoryTask)) { SStreamStatus* pStatus = &pTask->status; - mDebug("s-task:0x%x status is set to %s from %s for count window agg task with fill-history option set", - pTask->id.taskId, streamTaskGetStatusStr(pStatus->taskStatus), streamTaskGetStatusStr(TASK_STATUS__HALT)); + mDebug("s-task:0x%x status set %s from %s for count window agg task with fill-history option set", + pTask->id.taskId, streamTaskGetStatusStr(TASK_STATUS__HALT), streamTaskGetStatusStr(pStatus->taskStatus)); pStatus->taskStatus = TASK_STATUS__HALT; } } @@ -398,15 +397,17 @@ static void setHTasksId(SStreamObj* pStream) { static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey, SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) { - // new stream task SStreamTask* pTask = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } + mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); - haltInitialTaskStatus(pTask, plan); + if (pStream->conf.fillHistory) { + haltInitialTaskStatus(pTask, plan, isFillhistory); + } streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); @@ -415,6 +416,7 @@ static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStre terrno = code; return terrno; } + return TDB_CODE_SUCCESS; }