diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e59a9e1533..4ae7ea02b3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -253,9 +253,9 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { // save the task streamMetaSaveTask(pMeta, p); streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks - qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 - " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, + qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64 + ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", + pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, streamGetTaskStatusStr(prev)); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 08dc59dfd2..a07b775cb8 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -386,7 +386,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - ASSERT((*pVer) < pSubmit->submit.ver); + ASSERT((*pVer) <= pSubmit->submit.ver); (*pVer) = pSubmit->submit.ver; } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { @@ -405,7 +405,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks, pMerged->ver); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - ASSERT((*pVer) < pMerged->ver); + ASSERT((*pVer) <= pMerged->ver); (*pVer) = pMerged->ver; } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { @@ -485,7 +485,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer); if (ver != pTask->chkInfo.checkpointVer) { - qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, + qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, pTask->chkInfo.checkpointVer); }