From 5057749790944407a2ef3f68d7ef5e7ca815df48 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Jul 2024 10:18:58 +0800 Subject: [PATCH] fix(stream): fix race condition for dispatch msg. --- source/libs/stream/src/streamDispatch.c | 56 +++++++++++++++---------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 617adaa016..1959180a3e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1162,10 +1162,10 @@ void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) { static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId, int32_t downstreamNodeId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); - bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); - clearBufferedDispatchMsg(pTask); - int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); + + clearBufferedDispatchMsg(pTask); // put data into inputQ of current task is also allowed if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { @@ -1189,13 +1189,24 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId return 0; } -static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) { +static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) { int32_t numOfRsp = 0; bool alreadySet = false; bool updated = false; + bool allRsp = false; + *pNotRsp = 0; taosThreadMutexLock(&pMsgInfo->lock); - for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) { + int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); + + for(int32_t i = 0; i < numOfDispatchBranch; ++i) { + SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i); + if (pEntry->rspTs != -1) { + numOfRsp += 1; + } + } + + for (int32_t j = 0; j < numOfDispatchBranch; ++j) { SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j); if (pEntry->nodeId == vgId) { ASSERT(!alreadySet); @@ -1203,18 +1214,20 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3 pEntry->status = code; alreadySet = true; updated = true; - stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d", id, now, code, j); - } - - if (pEntry->rspTs != -1) { numOfRsp += 1; + + stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j, + numOfRsp, numOfDispatchBranch); } } - taosThreadMutexUnlock(&pMsgInfo->lock); - ASSERT(updated); + *pNotRsp = numOfDispatchBranch - numOfRsp; + allRsp = (numOfRsp == numOfDispatchBranch); - return numOfRsp; + taosThreadMutexUnlock(&pMsgInfo->lock); + + ASSERT(updated); + return allRsp; } bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) { @@ -1240,7 +1253,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; int64_t now = taosGetTimestampMs(); - int32_t totalRsp = 0; + bool allRsp = false; + int32_t notRsp = 0; taosThreadMutexLock(&pMsgInfo->lock); int32_t msgId = pMsgInfo->msgId; @@ -1269,18 +1283,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, id); } } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, id); stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } else { @@ -1292,7 +1306,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); } - totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id); + allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id); { bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); @@ -1317,13 +1331,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } } - int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (notRsp > 0) { + if (!allRsp) { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, " - "waiting " - "for %d rsp", + "waiting for %d rsp", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp); } else { stDebug( @@ -1337,7 +1349,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // all msg rsp already, continue - if (notRsp == 0) { + if (allRsp) { ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); // we need to re-try send dispatch msg to downstream tasks