fix(stream): the checkpoint version can only be updated when generating checkpoint.
This commit is contained in:
parent
45ab92a02d
commit
97772e9aab
|
@ -304,6 +304,7 @@ typedef struct SCheckpointInfo {
|
|||
int64_t startTs;
|
||||
int64_t checkpointId;
|
||||
int64_t checkpointVer; // latest checkpointId version
|
||||
int64_t processedVer; // already processed ver, that has generated results version.
|
||||
int64_t nextProcessVer; // current offset in WAL, not serialize it
|
||||
int64_t failedId; // record the latest failed checkpoint id
|
||||
} SCheckpointInfo;
|
||||
|
|
|
@ -297,9 +297,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
|||
continue;
|
||||
}
|
||||
|
||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId &&
|
||||
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
|
||||
|
||||
p->chkInfo.checkpointId = p->checkpointingId;
|
||||
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
|
||||
|
||||
streamTaskClearCheckInfo(p);
|
||||
|
||||
char* str = NULL;
|
||||
|
|
|
@ -593,7 +593,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
const SStreamQueueItem* pItem = pInput;
|
||||
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
|
||||
|
||||
int64_t ver = pTask->chkInfo.checkpointVer;
|
||||
int64_t ver = pTask->chkInfo.processedVer;
|
||||
doSetStreamInputBlock(pTask, pInput, &ver, id);
|
||||
|
||||
int64_t resSize = 0;
|
||||
|
@ -604,13 +604,16 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
|
||||
SIZE_IN_MiB(resSize), totalBlocks);
|
||||
|
||||
// update the currentVer if processing the submit blocks.
|
||||
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
|
||||
SCheckpointInfo* pInfo = &pTask->chkInfo;
|
||||
|
||||
if (ver != pTask->chkInfo.checkpointVer) {
|
||||
stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64,
|
||||
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
|
||||
pTask->chkInfo.checkpointVer = ver;
|
||||
// update the currentVer if processing the submit blocks.
|
||||
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
|
||||
|
||||
if (ver != pInfo->processedVer) {
|
||||
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
|
||||
" ckpt:%" PRId64,
|
||||
pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
|
||||
pInfo->processedVer = ver;
|
||||
}
|
||||
|
||||
streamFreeQitem(pInput);
|
||||
|
|
|
@ -431,8 +431,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMeta = pMeta;
|
||||
|
||||
pTask->chkInfo.checkpointVer = ver - 1;
|
||||
pTask->chkInfo.nextProcessVer = ver;
|
||||
pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint
|
||||
pTask->chkInfo.processedVer = ver - 1; // already processed version
|
||||
|
||||
pTask->chkInfo.nextProcessVer = ver; // next processed version
|
||||
pTask->dataRange.range.maxVer = ver;
|
||||
pTask->dataRange.range.minVer = ver;
|
||||
pTask->pMsgCb = pMsgCb;
|
||||
|
|
Loading…
Reference in New Issue