From a3eea6fe1c76ec068592e3fc907b1e342b0802ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Jan 2025 14:11:15 +0800 Subject: [PATCH] fix(stream): fix race condition. --- source/libs/stream/src/streamDispatch.c | 139 +++++++++++--------- source/libs/stream/src/streamStartHistory.c | 2 - 2 files changed, 76 insertions(+), 65 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ee2d337ff2..3d979f3d11 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -527,6 +527,76 @@ static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) { streamTaskFreeRefId(param); } +static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) { + int32_t code = 0; + const char* id = pTask->id.idStr; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + + streamMutexLock(&pMsgInfo->lock); + + int32_t msgId = pMsgInfo->msgId; + SStreamDispatchReq* pReq = pTask->msgInfo.pData; + + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId, + msgId); + + int32_t numOfRetry = 0; + for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); + if (pEntry == NULL) { + continue; + } + + if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { + continue; + } + + // downstream not rsp yet beyond threshold that is 10s + if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data + doSendFailedDispatch(pTask, pEntry, now, "timeout"); + numOfRetry += 1; + continue; + } + + // downstream inputQ is closed + if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { + doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); + numOfRetry += 1; + continue; + } + + // handle other errors + if (pEntry->status != TSDB_CODE_SUCCESS) { + doSendFailedDispatch(pTask, pEntry, now, "downstream error"); + numOfRetry += 1; + } + } + + stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry, + msgId); + } else { + int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; + SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + + int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo); + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); + if (pEntry != NULL) { + setResendInfo(pEntry, now); + code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); + + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, + pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); + } else { + stError("s-task:%s invalid index 0, size:%d", id, s); + } + } + + streamMutexUnlock(&pMsgInfo->lock); + return code; +} + static void doMonitorDispatchData(void* param, void* tmrId) { int32_t code = 0; int64_t now = taosGetTimestampMs(); @@ -590,65 +660,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { return; } - { - SStreamDispatchReq* pReq = pTask->msgInfo.pData; - - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, - pTask->info.selfChildId, msgId); - - int32_t numOfRetry = 0; - for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { - SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); - if (pEntry == NULL) { - continue; - } - - if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { - continue; - } - - // downstream not rsp yet beyond threshold that is 10s - if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data - doSendFailedDispatch(pTask, pEntry, now, "timeout"); - numOfRetry += 1; - continue; - } - - // downstream inputQ is closed - if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { - doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); - numOfRetry += 1; - continue; - } - - // handle other errors - if (pEntry->status != TSDB_CODE_SUCCESS) { - doSendFailedDispatch(pTask, pEntry, now, "downstream error"); - numOfRetry += 1; - } - } - - stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, - numOfRetry, msgId); - } else { - int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; - SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; - int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - - int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo); - SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); - if (pEntry != NULL) { - setResendInfo(pEntry, now); - code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); - - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, - pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); - } else { - stError("s-task:%s invalid index 0, size:%d", id, s); - } - } - } + code = sendFailedDispatchData(pTask, now); if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); @@ -880,7 +892,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - // todo: secure the timerActive and start timer in after lock pTask->lock + // todo: start timer in after lock pTask->lock streamMutexLock(&pTask->lock); bool shouldStop = streamTaskShouldStop(pTask); streamMutexUnlock(&pTask->lock); @@ -890,7 +902,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { streamMutexLock(&pTask->msgInfo.lock); if (pTask->msgInfo.inMonitor == 0) { -// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, tstrerror(code)); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); @@ -1873,8 +1884,10 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S status = streamTaskAppendInputBlocks(pTask, pReq); pInfo->lastMsgId = pReq->msgId; } else { - stWarn("s-task:%s duplicate msgId:%d from upstream:0x%x, from vgId:%d already recv msgId:%" PRId64, id, - pReq->msgId, pReq->upstreamTaskId, pReq->srcVgId, pInfo->lastMsgId); + stWarn( + "s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv " + "msgId:%" PRId64, + id, pReq->msgId, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->lastMsgId); status = TASK_INPUT_STATUS__NORMAL; // still return success } } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 54a8929123..4eb2e2e4e1 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -281,7 +281,6 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; -// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); if (code) { @@ -300,7 +299,6 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; if (streamTaskShouldStop(pTask)) { // record the failure -// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId, pInfo->hTaskId.taskId);