diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47900d540c..f98a69097b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -860,6 +860,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer); + + ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); } return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index cf606f2fc9..959d1382a3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -458,12 +458,49 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } +static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { + SCheckpointInfo* pChkInfo = &pTask->chkInfo; + SDataRange* pRange = &pTask->dataRange; + + // only set the version info for stream tasks without fill-history task + if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) { + pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint + pChkInfo->processedVer = ver - 1; // already processed version + pChkInfo->nextProcessVer = ver; // next processed version + + pRange->range.maxVer = ver; + pRange->range.minVer = ver; + } else { + // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task + // is set at the mnode. + if (pTask->info.fillHistory == 1) { + pChkInfo->checkpointVer = pRange->range.maxVer; + pChkInfo->processedVer = pRange->range.maxVer; + pChkInfo->nextProcessVer = pRange->range.maxVer + 1; + } else { + pChkInfo->checkpointVer = pRange->range.minVer - 1; + pChkInfo->processedVer = pRange->range.minVer - 1; + pChkInfo->nextProcessVer = pRange->range.minVer; + + { // for compatible purpose, remove it later + if (pRange->range.minVer == 0) { + pChkInfo->checkpointVer = 0; + pChkInfo->processedVer = 0; + pChkInfo->nextProcessVer = 1; + stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); + } + } + } + } +} + int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; + pTask->inputq.queue = streamQueueOpen(512 << 10); pTask->outputq.queue = streamQueueOpen(512 << 10); if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) { @@ -481,41 +518,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->execInfo.created = taosGetTimestampMs(); - SCheckpointInfo* pChkInfo = &pTask->chkInfo; - SDataRange* pRange = &pTask->dataRange; - - // only set the version info for stream tasks without fill-history task - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) { - pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint - pChkInfo->processedVer = ver - 1; // already processed version - pChkInfo->nextProcessVer = ver; // next processed version - - pRange->range.maxVer = ver; - pRange->range.minVer = ver; - } else { - // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task - // is set at the mnode. - if (pTask->info.fillHistory == 1) { - pChkInfo->checkpointVer = pRange->range.maxVer; - pChkInfo->processedVer = pRange->range.maxVer; - pChkInfo->nextProcessVer = pRange->range.maxVer + 1; - } else { - pChkInfo->checkpointVer = pRange->range.minVer - 1; - pChkInfo->processedVer = pRange->range.minVer - 1; - pChkInfo->nextProcessVer = pRange->range.minVer; - - { // for compatible purpose, remove it later - if (pRange->range.minVer == 0) { - pChkInfo->checkpointVer = 0; - pChkInfo->processedVer = 0; - pChkInfo->nextProcessVer = 1; - stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); - } - } - } - } - } + setInitialVersionInfo(pTask, ver); pTask->pMeta = pMeta; pTask->pMsgCb = pMsgCb;