diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 678ec7503f..654ac890d5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -368,7 +368,7 @@ typedef struct SStreamMeta { TdThreadMutex backendMutex; // uint64_t checkpointId; - int32_t notCkptReadyTasks; + int32_t chkptNotReadyTasks; SArray* checkpointSaved; SArray* checkpointInUse; int32_t checkpointCap; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 431aee9c5e..34ac0540d8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1523,6 +1523,12 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs memcpy(pRpcMsg, (SRpcMsg*)pMsg, sizeof(SRpcMsg)); taosArrayPush(pTask->pRpcMsgList, &pRpcMsg); + // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode + // set the initial value for generating check point + taosWLockLatch(&pMeta->lock); + pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); + taosWUnLockLatch(&pMeta->lock); + streamProcessCheckpointSourceReq(pMeta, pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3d8c14d1f2..621a4b57c7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -536,8 +536,10 @@ int32_t streamTryExec(SStreamTask* pTask) { // check for all tasks, and do generate the vnode-wide checkpoint data. // todo extract method SStreamMeta* pMeta = pTask->pMeta; - int32_t remain = atomic_sub_fetch_32(&pMeta->notCkptReadyTasks, 1); - if (remain <= 0) { // all tasks are in TASK_STATUS__CK_READY state + int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); + ASSERT(remain >= 0); + + if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state streamBackendDoCheckpoint(pMeta, pTask->checkpointingId); }