fix(stream): fix bug in checkpoint.
This commit is contained in:
parent
47877898d0
commit
8298f30e56
|
@ -368,7 +368,7 @@ typedef struct SStreamMeta {
|
||||||
TdThreadMutex backendMutex;
|
TdThreadMutex backendMutex;
|
||||||
|
|
||||||
// uint64_t checkpointId;
|
// uint64_t checkpointId;
|
||||||
int32_t notCkptReadyTasks;
|
int32_t chkptNotReadyTasks;
|
||||||
SArray* checkpointSaved;
|
SArray* checkpointSaved;
|
||||||
SArray* checkpointInUse;
|
SArray* checkpointInUse;
|
||||||
int32_t checkpointCap;
|
int32_t checkpointCap;
|
||||||
|
|
|
@ -1523,6 +1523,12 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs
|
||||||
memcpy(pRpcMsg, (SRpcMsg*)pMsg, sizeof(SRpcMsg));
|
memcpy(pRpcMsg, (SRpcMsg*)pMsg, sizeof(SRpcMsg));
|
||||||
taosArrayPush(pTask->pRpcMsgList, &pRpcMsg);
|
taosArrayPush(pTask->pRpcMsgList, &pRpcMsg);
|
||||||
|
|
||||||
|
// todo: when generating checkpoint, no new tasks are allowed to add into current Vnode
|
||||||
|
// set the initial value for generating check point
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
streamProcessCheckpointSourceReq(pMeta, pTask, &req);
|
streamProcessCheckpointSourceReq(pMeta, pTask, &req);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -536,8 +536,10 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
// check for all tasks, and do generate the vnode-wide checkpoint data.
|
// check for all tasks, and do generate the vnode-wide checkpoint data.
|
||||||
// todo extract method
|
// todo extract method
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
int32_t remain = atomic_sub_fetch_32(&pMeta->notCkptReadyTasks, 1);
|
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
|
||||||
if (remain <= 0) { // all tasks are in TASK_STATUS__CK_READY state
|
ASSERT(remain >= 0);
|
||||||
|
|
||||||
|
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
|
||||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue