diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eab3ecf04e..9d32912ece 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -304,6 +304,7 @@ typedef struct SCheckpointInfo { int64_t startTs; int64_t checkpointId; int64_t checkpointVer; // latest checkpointId version + int64_t processedVer; // already processed ver, that has generated results version. int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id } SCheckpointInfo; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 81840aaeb7..48b6486e05 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -297,9 +297,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { continue; } - ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); + ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId && + p->chkInfo.checkpointVer <= p->chkInfo.processedVer); p->chkInfo.checkpointId = p->checkpointingId; + p->chkInfo.checkpointVer = p->chkInfo.processedVer; + streamTaskClearCheckInfo(p); char* str = NULL; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 701cc76086..94cd2f76f3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -593,7 +593,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamQueueItem* pItem = pInput; stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); - int64_t ver = pTask->chkInfo.checkpointVer; + int64_t ver = pTask->chkInfo.processedVer; doSetStreamInputBlock(pTask, pInput, &ver, id); int64_t resSize = 0; @@ -604,13 +604,16 @@ int32_t streamExecForAll(SStreamTask* pTask) { stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, SIZE_IN_MiB(resSize), totalBlocks); - // update the currentVer if processing the submit blocks. - ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); + SCheckpointInfo* pInfo = &pTask->chkInfo; - if (ver != pTask->chkInfo.checkpointVer) { - stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64, - pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer); - pTask->chkInfo.checkpointVer = ver; + // update the currentVer if processing the submit blocks. + ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); + + if (ver != pInfo->processedVer) { + stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 + " ckpt:%" PRId64, + pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); + pInfo->processedVer = ver; } streamFreeQitem(pInput); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a7fb590d1b..24228c0307 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -431,8 +431,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; - pTask->chkInfo.checkpointVer = ver - 1; - pTask->chkInfo.nextProcessVer = ver; + pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint + pTask->chkInfo.processedVer = ver - 1; // already processed version + + pTask->chkInfo.nextProcessVer = ver; // next processed version pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb;