From 24559725daf8cc03d5b8b1d137b5de3383e9c9bf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Apr 2024 18:45:30 +0800 Subject: [PATCH] fix(stream): add lock for when set checkpoint dispatch msg. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 6 ++++++ source/libs/stream/src/streamDispatch.c | 7 ++++++- source/libs/stream/src/streamTaskSm.c | 4 ---- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1b67dce9b0..0f7f74f78b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -850,12 +850,18 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); + taosThreadMutexLock(&pTask->lock); + // clear flag set during do checkpoint, and open inputQ for all upstream tasks if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", + pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); streamTaskClearCheckInfo(pTask, true); streamTaskSetStatusReady(pTask); } + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0af664f1e1..4a16fe869b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -947,9 +947,14 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { - pTask->chkInfo.dispatchCheckpointTrigger = true; + taosThreadMutexLock(&pTask->lock); + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + pTask->chkInfo.dispatchCheckpointTrigger = true; + } + taosThreadMutexUnlock(&pTask->lock); } clearBufferedDispatchMsg(pTask); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 6aa215586a..cfa94209f6 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -543,8 +543,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { return; } - taosThreadMutexLock(&pTask->lock); - pSM->prev.state = pSM->current; pSM->prev.evt = 0; @@ -552,8 +550,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { pSM->startTs = taosGetTimestampMs(); pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); - - taosThreadMutexUnlock(&pTask->lock); } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,