From 46b17aa9f1104c2a3cdb42efe3a7cd79df544dfb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Sep 2023 19:35:12 +0800 Subject: [PATCH] fix(stream): fix race condition. --- source/dnode/snode/src/snode.c | 5 +-- source/libs/stream/src/stream.c | 45 +++++++++++++------------ source/libs/stream/src/streamDispatch.c | 45 +++++++++++++------------ 3 files changed, 49 insertions(+), 46 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3c6518bd2f..d6e575d97c 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -37,10 +37,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId); if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; + SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; streamProcessDispatchMsg(pTask, &req, &rsp); streamMetaReleaseTask(pSnode->pMeta, pTask); rpcFreeCont(pMsg->pCont); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6babe66d0d..e1b9a2fb2f 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -213,36 +213,39 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, } int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, - pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - int32_t status = 0; + int32_t status = 0; + const char* id = pTask->id.idStr; + + stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, + pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); ASSERT(pInfo != NULL); if (!pTask->pMeta->leader) { - stError("s-task:%s task on follower received dispatch msgs, should discard it, not now", pTask->id.idStr); - } - - // upstream task has restarted/leader-follower switch/transferred to other dnodes - if (pReq->stage > pInfo->stage) { - stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64 - ", current:%" PRId64 " dispatch msg rejected", - pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage); + stError("s-task:%s task on follower received dispatch msgs, should discard it, not now", id); status = TASK_INPUT_STATUS__BLOCKED; } else { - if (!pInfo->dataAllowed) { - stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, - pReq->upstreamTaskId); + if (pReq->stage > pInfo->stage) { + // upstream task has restarted/leader-follower switch/transferred to other dnodes + stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64 + ", current:%" PRId64 " dispatch msg rejected", + id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage); status = TASK_INPUT_STATUS__BLOCKED; } else { - // This task has received the checkpoint req from the upstream task, from which all the messages should be blocked - if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); - stDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); - } + if (!pInfo->dataAllowed) { + stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId); + status = TASK_INPUT_STATUS__BLOCKED; + } else { + // This task has received the checkpoint req from the upstream task, from which all the messages should be + // blocked + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); + } - status = streamTaskAppendInputBlocks(pTask, pReq); + status = streamTaskAppendInputBlocks(pTask, pReq); + } } } @@ -256,7 +259,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S // do send response with the input status int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", pTask->id.idStr, pReq->msgId, tstrerror(code)); + stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code)); return code; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e973cfd4dc..5977d31e0d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1074,27 +1074,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return TSDB_CODE_INVALID_MSG; } - int32_t leftRsp = 0; - - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt); - - leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - ASSERT(leftRsp >= 0); - - if (leftRsp > 0) { - stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); - } else { - stDebug( - "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); - } - } else { - stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); - } - if (code != TSDB_CODE_SUCCESS) { // dispatch message failed: network error, or node not available. // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set @@ -1112,14 +1091,19 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } else { stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); + taosThreadMutexLock(&pTask->lock); taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); + taosThreadMutexUnlock(&pTask->lock); } } else { // code == 0 if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream + taosThreadMutexLock(&pTask->lock); taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); + taosThreadMutexUnlock(&pTask->lock); + stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); } @@ -1139,6 +1123,25 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } } + int32_t leftRsp = 0; + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt); + leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + ASSERT(leftRsp >= 0); + + if (leftRsp > 0) { + stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); + } else { + stDebug( + "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + } + } else { + stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + } + ASSERT(leftRsp >= 0); // all msg rsp already, continue