From 1ffec769b8d10205899d49817222b4dd929fed8b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jun 2024 17:03:14 +0800 Subject: [PATCH] fix(stream): clear the msgId if send success, and handle the race condition problem. --- source/libs/stream/src/streamDispatch.c | 41 +++++++++++++++---------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 789cb5cbcf..83e73e8c88 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -40,6 +40,20 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->contLen = contLen; } +static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { + pInfo->startTs = taosGetTimestampMs(); + pInfo->rspTs = -1; + pInfo->msgId = msgId; +} + +static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { + pInfo->startTs = -1; + pInfo->msgId = -1; + pInfo->rspTs = -1; +} + +static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } + static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; @@ -225,12 +239,15 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); } + taosThreadMutexLock(&pMsgInfo->lock); + pMsgInfo->checkpointId = -1; pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; - taosThreadMutexLock(&pMsgInfo->lock); + clearDispatchInfo(pMsgInfo); + taosArrayClear(pTask->msgInfo.pSendInfo); taosThreadMutexUnlock(&pMsgInfo->lock); } @@ -643,20 +660,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S return 0; } -static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { - pInfo->startTs = taosGetTimestampMs(); - pInfo->rspTs = -1; - pInfo->msgId = msgId; -} - -static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { - pInfo->startTs = -1; - pInfo->msgId = -1; - pInfo->rspTs = -1; -} - -static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } - int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); @@ -699,7 +702,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { type == STREAM_INPUT__TRANS_STATE); pTask->execInfo.dispatch += 1; + + taosThreadMutexLock(&pTask->msgInfo.lock); initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); + taosThreadMutexUnlock(&pTask->msgInfo.lock); int32_t code = doBuildDispatchMsg(pTask, pBlock); if (code == 0) { @@ -1222,10 +1228,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; - int32_t msgId = pMsgInfo->msgId; int64_t now = taosGetTimestampMs(); int32_t totalRsp = 0; + taosThreadMutexLock(&pMsgInfo->lock); + int32_t msgId = pMsgInfo->msgId; + taosThreadMutexUnlock(&pMsgInfo->lock); + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,