fix(stream): the checkpoint version can only be updated when generating checkpoint.

This commit is contained in:
Haojun Liao 2023-11-09 17:11:37 +08:00
parent 62f34c757e
commit 13c6d1b23a
4 changed files with 19 additions and 10 deletions

View File

@ -304,6 +304,7 @@ typedef struct SCheckpointInfo {
int64_t startTs; int64_t startTs;
int64_t checkpointId; int64_t checkpointId;
int64_t checkpointVer; // latest checkpointId version int64_t checkpointVer; // latest checkpointId version
int64_t processedVer; // already processed ver, that has generated results version.
int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t failedId; // record the latest failed checkpoint id int64_t failedId; // record the latest failed checkpoint id
} SCheckpointInfo; } SCheckpointInfo;

View File

@ -297,9 +297,12 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
continue; continue;
} }
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
p->chkInfo.checkpointId = p->checkpointingId; p->chkInfo.checkpointId = p->checkpointingId;
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
streamTaskClearCheckInfo(p); streamTaskClearCheckInfo(p);
char* str = NULL; char* str = NULL;

View File

@ -593,7 +593,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
const SStreamQueueItem* pItem = pInput; const SStreamQueueItem* pItem = pInput;
stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type);
int64_t ver = pTask->chkInfo.checkpointVer; int64_t ver = pTask->chkInfo.processedVer;
doSetStreamInputBlock(pTask, pInput, &ver, id); doSetStreamInputBlock(pTask, pInput, &ver, id);
int64_t resSize = 0; int64_t resSize = 0;
@ -604,13 +604,16 @@ int32_t streamExecForAll(SStreamTask* pTask) {
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MiB(resSize), totalBlocks); SIZE_IN_MiB(resSize), totalBlocks);
// update the currentVer if processing the submit blocks. SCheckpointInfo* pInfo = &pTask->chkInfo;
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
if (ver != pTask->chkInfo.checkpointVer) { // update the currentVer if processing the submit blocks.
stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64, ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer);
pTask->chkInfo.checkpointVer = ver; if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
" ckpt:%" PRId64,
pTask->id.idStr, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer);
pInfo->processedVer = ver;
} }
streamFreeQitem(pInput); streamFreeQitem(pInput);

View File

@ -431,8 +431,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta; pTask->pMeta = pMeta;
pTask->chkInfo.checkpointVer = ver - 1; pTask->chkInfo.checkpointVer = ver - 1; // only update when generating checkpoint
pTask->chkInfo.nextProcessVer = ver; pTask->chkInfo.processedVer = ver - 1; // already processed version
pTask->chkInfo.nextProcessVer = ver; // next processed version
pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver; pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb; pTask->pMsgCb = pMsgCb;