From 24559725daf8cc03d5b8b1d137b5de3383e9c9bf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Apr 2024 18:45:30 +0800 Subject: [PATCH 1/3] fix(stream): add lock for when set checkpoint dispatch msg. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 6 ++++++ source/libs/stream/src/streamDispatch.c | 7 ++++++- source/libs/stream/src/streamTaskSm.c | 4 ---- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1b67dce9b0..0f7f74f78b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -850,12 +850,18 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); + taosThreadMutexLock(&pTask->lock); + // clear flag set during do checkpoint, and open inputQ for all upstream tasks if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", + pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); streamTaskClearCheckInfo(pTask, true); streamTaskSetStatusReady(pTask); } + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0af664f1e1..4a16fe869b 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -947,9 +947,14 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { - pTask->chkInfo.dispatchCheckpointTrigger = true; + taosThreadMutexLock(&pTask->lock); + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + pTask->chkInfo.dispatchCheckpointTrigger = true; + } + taosThreadMutexUnlock(&pTask->lock); } clearBufferedDispatchMsg(pTask); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 6aa215586a..cfa94209f6 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -543,8 +543,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { return; } - taosThreadMutexLock(&pTask->lock); - pSM->prev.state = pSM->current; pSM->prev.evt = 0; @@ -552,8 +550,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { pSM->startTs = taosGetTimestampMs(); pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); - - taosThreadMutexUnlock(&pTask->lock); } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, From dcc97472d319535283b96a9b07187c53fb9ae0ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Apr 2024 21:53:50 +0800 Subject: [PATCH 2/3] fix(stream): add strict check for checkpoint-trigger msg dispatach rsp confirmation flag. --- include/libs/stream/tstream.h | 2 ++ source/libs/stream/src/streamDispatch.c | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e7c6491b9d..138fad0ddb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -355,6 +355,8 @@ typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data int8_t dispatchMsgType; + int64_t checkpointId;// checkpoint id msg + int32_t transId; // transId for current checkpoint int16_t msgType; // dispatch msg type int32_t retryCount; // retry send data count int64_t startTs; // dispatch start time, record total elapsed time for dispatch diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e5709aab6e..44a496811f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -321,6 +321,8 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); } + pMsgInfo->checkpointId = -1; + pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; } @@ -332,6 +334,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.dispatchMsgType = pData->type; + if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + SSDataBlock* p = taosArrayGet(pData->blocks, 0); + pTask->msgInfo.checkpointId = p->info.version; + pTask->msgInfo.transId = p->info.window.ekey; + } + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); @@ -954,7 +962,9 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { taosThreadMutexLock(&pTask->lock); - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + // we only set the dispatch msg info for current checkpoint trans + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) { + ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId); pTask->chkInfo.dispatchCheckpointTrigger = true; } taosThreadMutexUnlock(&pTask->lock); From 3619518d54c1e69eed78d16775c3f61a162af43e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Apr 2024 21:57:21 +0800 Subject: [PATCH 3/3] fix(stream): add logs. --- source/libs/stream/src/streamDispatch.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 44a496811f..baf5ebf8cb 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -966,6 +966,11 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) { ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId); pTask->chkInfo.dispatchCheckpointTrigger = true; + stDebug("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d confirmed", + pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); + } else { + stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired", + pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); } taosThreadMutexUnlock(&pTask->lock); }