From cbda61c7f96c0a3f1d983d8684f437997743b2af Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jul 2023 09:53:08 +0800 Subject: [PATCH 1/4] fix(stream): fix error in checkpointing. --- source/libs/stream/src/streamCheckpoint.c | 1 - source/libs/stream/src/streamExec.c | 34 ++++++++++++++++------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index cee3438280..e421a5d671 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -204,7 +204,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, } int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) { - int32_t code; int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b3e5dc2475..5f85abb6b6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -301,11 +301,11 @@ int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) { qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); SCheckpointInfo* pCkInfo = &pTask->chkInfo; - qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64 - " -> %" PRId64, - pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId); +// qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64 +// " -> %" PRId64, +// pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId); pCkInfo->keptCheckpointId = checkpointId; - pCkInfo->version = dataVer; +// pCkInfo->version = dataVer; return TSDB_CODE_SUCCESS; } @@ -418,6 +418,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); + int64_t currentVer = pTask->chkInfo.currentVer; { // set input @@ -433,6 +434,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + ASSERT(currentVer < pSubmit->submit.ver); + currentVer = pSubmit->submit.ver; } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; @@ -445,8 +448,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks); + qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, + numOfBlocks, pMerged->ver); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); + ASSERT(currentVer < pMerged->ver); + currentVer = pMerged->ver; } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); @@ -466,6 +472,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, resSize / 1048576.0, totalBlocks); + // update the currentVer if processing the submit blocks. + if(currentVer > pTask->chkInfo.currentVer) { + qDebug("s-task:%s update currentVer from %" PRId64 " to %" PRId64, pTask->id.idStr, + pTask->chkInfo.currentVer, currentVer); + pTask->chkInfo.currentVer = currentVer; + } + int32_t type = pInput->type; streamFreeQitem(pInput); @@ -524,15 +537,16 @@ int32_t streamTryExec(SStreamTask* pTask) { if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%"PRId64, pMeta->vgId); + qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%"PRId64, pMeta->vgId); } if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - code = updateCheckPointInfo(pTask, pTask->checkpointingId); - if (code != TSDB_CODE_SUCCESS) { - return code; - } +// code = updateCheckPointInfo(pTask, pTask->checkpointingId); +// if (code != TSDB_CODE_SUCCESS) { +// return code; +// } } + // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); From 62a763ead3cb23b9845fc1e15a6fc0ae1d6b5527 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jul 2023 11:22:48 +0800 Subject: [PATCH 2/4] fix(stream): fix invalid assert. --- source/libs/stream/src/streamCheckpoint.c | 1 - source/libs/stream/src/streamExec.c | 7 ------- 2 files changed, 8 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e421a5d671..0557936a83 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -121,7 +121,6 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i pTask->checkpointingId = checkpointId; } - ASSERT(pTask->checkpointingId == checkpointId); return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c882b719bc..5b03db6f0c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -541,13 +541,6 @@ int32_t streamTryExec(SStreamTask* pTask) { pTask->checkpointingId); } - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { -// code = updateCheckPointInfo(pTask, pTask->checkpointingId); -// if (code != TSDB_CODE_SUCCESS) { -// return code; -// } - } - // send check point response to upstream task if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); From 571cccd7376978c21f93e73e4eaf8e9cade3c72b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jul 2023 11:41:37 +0800 Subject: [PATCH 3/4] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 112 ++++++++++++++++------------ 1 file changed, 63 insertions(+), 49 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5b03db6f0c..ae53e0cb65 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -378,6 +378,56 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +// set input +static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pCurrentVer, const char* id) { + void* pExecutor = pTask->exec.pExecutor; + + const SStreamQueueItem* pItem = pInput; + if (pItem->type == STREAM_INPUT__GET_RES) { + const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; + qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + + } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; + qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, + pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + ASSERT((*pCurrentVer) < pSubmit->submit.ver); + (*pCurrentVer) = pSubmit->submit.ver; + + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { + const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; + + SArray* pBlockList = pBlock->blocks; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); + + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; + + SArray* pBlockList = pMerged->submits; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, + numOfBlocks, pMerged->ver); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); + ASSERT((*pCurrentVer) < pMerged->ver); + (*pCurrentVer) = pMerged->ver; + + } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { + const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; + qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + + } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { + const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; + qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); + + } else { + ASSERT(0); + } +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -418,52 +468,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { } int64_t st = taosGetTimestampMs(); - qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); + + const SStreamQueueItem* pItem = pInput; + qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type); + int64_t currentVer = pTask->chkInfo.currentVer; - - { - // set input - void* pExecutor = pTask->exec.pExecutor; - - const SStreamQueueItem* pItem = pInput; - if (pItem->type == STREAM_INPUT__GET_RES) { - const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; - qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; - qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, - pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - ASSERT(currentVer < pSubmit->submit.ver); - currentVer = pSubmit->submit.ver; - } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; - - SArray* pBlockList = pBlock->blocks; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; - - SArray* pBlockList = pMerged->submits; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, - numOfBlocks, pMerged->ver); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - ASSERT(currentVer < pMerged->ver); - currentVer = pMerged->ver; - } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { - const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; - qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { - const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; - qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); - } else { - ASSERT(0); - } - } + doSetStreamInputBlock(pTask, pInput, ¤tVer, id); int64_t resSize = 0; int32_t totalBlocks = 0; @@ -523,7 +533,6 @@ int32_t streamTryExec(SStreamTask* pTask) { return -1; } - // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); @@ -551,17 +560,22 @@ int32_t streamTryExec(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { taosWLockLatch(&pTask->pMeta->lock); + ASSERT(pTask->chkInfo.keptCheckpointId < pTask->checkpointingId); + pTask->chkInfo.keptCheckpointId = pTask->checkpointingId; + streamMetaSaveTask(pTask->pMeta, pTask); if (streamMetaCommit(pTask->pMeta) < 0) { taosWUnLockLatch(&pTask->pMeta->lock); - qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); + qError("s-task:%s failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", ver:%" PRId64 + ", since %s", + pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, terrstr()); return -1; } else { taosWUnLockLatch(&pTask->pMeta->lock); - qDebug("s-task:%s commit after checkpoint generating", pTask->id.idStr); } - qInfo("vgId:%d s-task:%s commit task status after checkpoint completed", pMeta->vgId, pTask->id.idStr); + qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64, + pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version); } else { // todo: let's retry send rsp to upstream/mnode } From 09b764494d56c079e3d0d84d91881461fb688a7a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jul 2023 14:03:43 +0800 Subject: [PATCH 4/4] fix(stream): set correct checkpoint version when starting stream tasks. --- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 1 - source/dnode/vnode/src/tq/tq.c | 10 ++--- source/libs/stream/src/streamCheckpoint.c | 2 +- source/libs/stream/src/streamExec.c | 46 +++++++++-------------- 5 files changed, 23 insertions(+), 38 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4581d48d1a..c1a4afa566 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -259,7 +259,7 @@ typedef struct SStreamId { typedef struct SCheckpointInfo { int64_t keptCheckpointId; - int64_t version; // offset in WAL + int64_t version; // latest checkpointId version int64_t currentVer; // current offset in WAL, not serialize it } SCheckpointInfo; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3e0b725e2c..39f0a81d49 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -75,7 +75,6 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; - pTask->chkInfo.version = ver; pTask->pMeta = pSnode->pMeta; pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6a08239524..2e6169dca3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -757,7 +757,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; - pTask->chkInfo.version = ver; pTask->chkInfo.currentVer = ver; pTask->dataRange.range.maxVer = ver; @@ -855,14 +854,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } streamSetupScheduleTrigger(pTask); + SCheckpointInfo* pChkInfo = &pTask->chkInfo; - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 + tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", - vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel, - pTask->info.fillHistory, pTask->triggerParam); + vgId, pTask->id.idStr, pChkInfo->keptCheckpointId, pChkInfo->version, pChkInfo->currentVer, + pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); - // next valid version will add one - pTask->chkInfo.version += 1; return 0; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0557936a83..4a7e571011 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -118,7 +118,6 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num); if (old == 0) { qDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); - pTask->checkpointingId = checkpointId; } return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); @@ -207,6 +206,7 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe int32_t childId = pReq->childId; // set the task status + pTask->checkpointingId = checkpointId; pTask->status.taskStatus = TASK_STATUS__CK; ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ae53e0cb65..d4b6f0927d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -295,21 +295,6 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif -int32_t updateCheckPointInfo(SStreamTask* pTask, int64_t checkpointId) { - int64_t ckId = 0; - int64_t dataVer = 0; - qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - - SCheckpointInfo* pCkInfo = &pTask->chkInfo; -// qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 ", checkpointId:%" PRId64 -// " -> %" PRId64, -// pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->keptCheckpointId, checkpointId); - pCkInfo->keptCheckpointId = checkpointId; -// pCkInfo->version = dataVer; - - return TSDB_CODE_SUCCESS; -} - static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { // wait for the stream task to be idle int64_t st = taosGetTimestampMs(); @@ -379,7 +364,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } // set input -static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pCurrentVer, const char* id) { +static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pVer, const char* id) { void* pExecutor = pTask->exec.pExecutor; const SStreamQueueItem* pItem = pInput; @@ -393,8 +378,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - ASSERT((*pCurrentVer) < pSubmit->submit.ver); - (*pCurrentVer) = pSubmit->submit.ver; + ASSERT((*pVer) < pSubmit->submit.ver); + (*pVer) = pSubmit->submit.ver; } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; @@ -412,8 +397,8 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks, pMerged->ver); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - ASSERT((*pCurrentVer) < pMerged->ver); - (*pCurrentVer) = pMerged->ver; + ASSERT((*pVer) < pMerged->ver); + (*pVer) = pMerged->ver; } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; @@ -472,8 +457,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { const SStreamQueueItem* pItem = pInput; qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type); - int64_t currentVer = pTask->chkInfo.currentVer; - doSetStreamInputBlock(pTask, pInput, ¤tVer, id); + int64_t ver = pTask->chkInfo.version; + doSetStreamInputBlock(pTask, pInput, &pTask->chkInfo.version, id); int64_t resSize = 0; int32_t totalBlocks = 0; @@ -484,10 +469,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { resSize / 1048576.0, totalBlocks); // update the currentVer if processing the submit blocks. - if(currentVer > pTask->chkInfo.currentVer) { - qDebug("s-task:%s update currentVer from %" PRId64 " to %" PRId64, pTask->id.idStr, - pTask->chkInfo.currentVer, currentVer); - pTask->chkInfo.currentVer = currentVer; + ASSERT(pTask->chkInfo.version <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.version); + + if(ver != pTask->chkInfo.version) { + qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, + pTask->chkInfo.version); } int32_t type = pInput->type; @@ -546,7 +532,7 @@ int32_t streamTryExec(SStreamTask* pTask) { if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); - qDebug("vgId:%d do vnode wide checkpoint completed, checkpoint id:%" PRId64 "", pMeta->vgId, + qDebug("vgId:%d do vnode wide checkpoint completed, checkpointId:%" PRId64, pMeta->vgId, pTask->checkpointingId); } @@ -574,8 +560,10 @@ int32_t streamTryExec(SStreamTask* pTask) { taosWUnLockLatch(&pTask->pMeta->lock); } - qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64, - pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version); + qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 + " currentVer:%" PRId64, + pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, + pTask->chkInfo.currentVer); } else { // todo: let's retry send rsp to upstream/mnode }