From dcc97472d319535283b96a9b07187c53fb9ae0ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Apr 2024 21:53:50 +0800 Subject: [PATCH] fix(stream): add strict check for checkpoint-trigger msg dispatach rsp confirmation flag. --- include/libs/stream/tstream.h | 2 ++ source/libs/stream/src/streamDispatch.c | 12 +++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e7c6491b9d..138fad0ddb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -355,6 +355,8 @@ typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data int8_t dispatchMsgType; + int64_t checkpointId;// checkpoint id msg + int32_t transId; // transId for current checkpoint int16_t msgType; // dispatch msg type int32_t retryCount; // retry send data count int64_t startTs; // dispatch start time, record total elapsed time for dispatch diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e5709aab6e..44a496811f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -321,6 +321,8 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); } + pMsgInfo->checkpointId = -1; + pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; } @@ -332,6 +334,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.dispatchMsgType = pData->type; + if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + SSDataBlock* p = taosArrayGet(pData->blocks, 0); + pTask->msgInfo.checkpointId = p->info.version; + pTask->msgInfo.transId = p->info.window.ekey; + } + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); @@ -954,7 +962,9 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { taosThreadMutexLock(&pTask->lock); - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + // we only set the dispatch msg info for current checkpoint trans + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) { + ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId); pTask->chkInfo.dispatchCheckpointTrigger = true; } taosThreadMutexUnlock(&pTask->lock);