From 9af6076b4f1aa849b434d91f5117faf49de589f8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 14 Jun 2024 14:42:22 +0800 Subject: [PATCH] fix(stream): add the dispatch entry. --- source/libs/stream/src/streamDispatch.c | 32 ++++++++++++++++++------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c10d716881..b17d0206f0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -32,6 +32,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas int32_t numOfBlocks, int64_t dstTaskId, int32_t type); static int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now); static bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now); +static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->msgType = msgType; @@ -306,6 +307,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD } } + addDispatchEntry(&pTask->msgInfo, pTask->outputInfo.fixedDispatcher.nodeId, now, true); pTask->msgInfo.pData = pReqs; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -328,11 +330,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD // it's a new vnode to receive dispatch msg, so add one if (pReqs[j].blockNum == 0) { SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); - SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now}; - - taosThreadMutexLock(&pTask->msgInfo.lock); - taosArrayPush(pTask->msgInfo.pSendInfo, &entry); - taosThreadMutexUnlock(&pTask->msgInfo.lock); + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, true); } pReqs[j].blockNum++; @@ -422,6 +420,20 @@ static void setResendInfo(SDispatchEntry* pEntry, int64_t now) { pEntry->retryCount += 1; } +static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) { + SDispatchEntry entry = {.nodeId = nodeId, .rspTs = -1, .status = 0, .sendTs = now}; + + if (lock) { + taosThreadMutexLock(&pMsgInfo->lock); + } + + taosArrayPush(pMsgInfo->pSendInfo, &entry); + + if (lock) { + taosThreadMutexUnlock(&pMsgInfo->lock); + } +} + static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int64_t now, const char* pMsg) { SStreamDispatchReq* pReq = pTask->msgInfo.pData; @@ -618,8 +630,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S if (pReqs[j].blockNum == 0) { SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); - SDispatchEntry entry = {.nodeId = pDstVgroupInfo->vgId, .rspTs = -1, .status = 0, .sendTs = now}; - taosArrayPush(pTask->msgInfo.pSendInfo, &entry); + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); } pReqs[j].blockNum++; @@ -1153,6 +1164,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) { int32_t numOfRsp = 0; bool alreadySet = false; + bool updated = false; taosThreadMutexLock(&pMsgInfo->lock); for(int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { @@ -1162,7 +1174,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3 pEntry->rspTs = now; pEntry->status = code; alreadySet = true; - stDebug("s-task:%s record the rps recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j); + updated = true; + stDebug("s-task:%s record the rsp recv, ts:%"PRId64" code:%d, idx:%d", id, now, code, j); } if (pEntry->rspTs != -1) { @@ -1171,6 +1184,8 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3 } taosThreadMutexUnlock(&pMsgInfo->lock); + ASSERT(updated); + return numOfRsp; } @@ -1417,6 +1432,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } streamTrySchedExec(pTask); - return 0; }