fix(stream): set save checkpoint ver.

This commit is contained in:
Haojun Liao 2023-07-14 19:47:19 +08:00
parent 035b199497
commit 725db16af9
2 changed files with 6 additions and 6 deletions

View File

@ -253,9 +253,9 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
// save the task
streamMetaSaveTask(pMeta, p);
streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks
qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64
" currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,
qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,
streamGetTaskStatusStr(prev));
}

View File

@ -386,7 +386,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
ASSERT((*pVer) < pSubmit->submit.ver);
ASSERT((*pVer) <= pSubmit->submit.ver);
(*pVer) = pSubmit->submit.ver;
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
@ -405,7 +405,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
pMerged->ver);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT((*pVer) < pMerged->ver);
ASSERT((*pVer) <= pMerged->ver);
(*pVer) = pMerged->ver;
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
@ -485,7 +485,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer);
if (ver != pTask->chkInfo.checkpointVer) {
qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, ver,
pTask->chkInfo.checkpointVer);
}