From 619a9f6d08f7933c2ce4b35b10bf50f03b2c28a3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Sep 2023 15:57:55 +0800 Subject: [PATCH] fix(stream): refactor re-try dispatch msg for stream tasks. --- include/libs/stream/tstream.h | 3 +- source/libs/stream/src/streamDispatch.c | 233 ++++++++++++------------ source/libs/stream/src/streamTask.c | 2 + 3 files changed, 122 insertions(+), 116 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b2927d839d..d7ad1ddf08 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -297,7 +297,8 @@ typedef struct SDispatchMsgInfo { int8_t dispatchMsgType; int16_t msgType; // dispatch msg type int32_t retryCount; // retry send data count - int64_t blockingTs; // output blocking timestamp + int64_t startTs; // output blocking timestamp + SArray* pRetryList; // current dispatch successfully completed node of downstream } SDispatchMsgInfo; typedef struct STaskOutputInfo { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2775a90abf..a3eda3ceb2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -370,29 +370,6 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD } pTask->msgInfo.pData = pReqs; -// *pDispatchReq = pReqs; - -// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroup(s), msgId:%d", pTask->id.idStr, -// pTask->info.selfChildId, numOfBlocks, numOfVgroups, msgId); -// -// for (int32_t i = 0; i < numOfVgroups; i++) { -// if (pReqs[i].blockNum > 0) { -// SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); -// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, -// pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); -// -// code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); -// if (code < 0) { -// destroyDispatchMsg(pReqs, numOfVgroups); -// return code; -// } -// } -// } -// -// stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId); -// code = 0; -// -// *pDispatchReq = pReqs; } stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->taskExecInfo.dispatchCount); @@ -441,6 +418,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; + const char* id = pTask->id.idStr; + int32_t msgId = pTask->taskExecInfo.dispatchCount; if (streamTaskShouldStop(&pTask->status)) { int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); @@ -450,11 +429,53 @@ static void doRetryDispatchData(void* param, void* tmrId) { ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - int32_t code = sendDispatchMsg(pTask, pTask->msgInfo.pData); + int32_t code = 0; + { + SStreamDispatchReq *pReq = pTask->msgInfo.pData; + + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t numOfVgroups = taosArrayGetSize(vgInfo); + + + int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList); + stDebug("s-task:%s (child taskId:%d) re-try shuffle-dispatch blocks to %d vgroup(s), msgId:%d", + id, pTask->info.selfChildId, numOfFailed, msgId); + + for (int32_t i = 0; i < numOfFailed; i++) { + int32_t vgId = *(int32_t*) taosArrayGet(pTask->msgInfo.pRetryList, i); + + for(int32_t j = 0; j < numOfVgroups; ++j) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo->vgId == vgId) { + stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, + pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId); + + code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet); + if (code < 0) { + break; + } + } + } + } + + stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId); + } else { + int32_t vgId = pTask->fixedEpDispatcher.nodeId; + SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; + int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; + + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, + pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); + + code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet); + } + } + if (code != TSDB_CODE_SUCCESS) { if (!streamTaskShouldStop(&pTask->status)) { - stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); - atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); +// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); +// atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); if (streamTaskShouldPause(&pTask->status)) { streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); } else { @@ -471,7 +492,9 @@ static void doRetryDispatchData(void* param, void* tmrId) { } void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { - stWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration); + stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d", pTask->id.idStr, waitDuration, + pTask->taskExecInfo.dispatchCount); + if (pTask->launchTaskTimer != NULL) { taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); } else { @@ -1011,12 +1034,14 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); pTask->msgInfo.pData = NULL; - if (pTask->msgInfo.blockingTs != 0) { - int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; + if (pTask->msgInfo.startTs != 0) { + int64_t now = taosGetTimestampMs(); + + int64_t el = now - pTask->msgInfo.startTs; stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", pTask->id.idStr, downstreamId, el); - pTask->msgInfo.blockingTs = 0; + pTask->msgInfo.startTs = now; // put data into inputQ of current task is also allowed pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; } @@ -1033,12 +1058,28 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i const char* id = pTask->id.idStr; int32_t msgId = pTask->taskExecInfo.dispatchCount; + int32_t leftRsp = 0; + + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + if (leftRsp > 0) { + stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); + } else { + stDebug( + "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + } + } else { + stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d", + id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + } + if (code != TSDB_CODE_SUCCESS) { // dispatch message failed: network error, or node not available. // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set // flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure // happened too fast. - // todo handle the shuffle dispatch failure if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); @@ -1048,98 +1089,60 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id); // streamProcessCheckpointReadyMsg(pTask); // } - - // we should set the correct finish flag to make sure the shuffle dispatch will be executed completed. - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - if (left > 0) { // do nothing - stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id); - } else { - stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id); - handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); - } - } } else { - stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, retry", id, msgId, + stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); - SStreamDispatchReq* pDispatchMsg = pTask->msgInfo.pData; + taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); + } - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgroups = taosArrayGetSize(vgInfo); - for(int32_t i = 0; i < numOfVgroups; ++i) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - if (pVgInfo->vgId == pRsp->downstreamNodeId) { - stDebug("s-task:%s (child taskId:%d) re-send blocks:%d to vgId:%d", pTask->id.idStr, - pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId); - code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet); - } - } - } else { - sendDispatchMsg(pTask, pTask->msgInfo.pData); + } else { // code == 0 + if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; + // block the input of current task, to push pressure to upstream + taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); + stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); + } + + // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state + if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { + stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); + ASSERT(pTask->info.fillHistory == 1); + + code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } - } - return TSDB_CODE_SUCCESS; + // now ready for next data output + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); + return TSDB_CODE_SUCCESS; + } } - // there are other dispatch message not response yet - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - if (leftRsp > 0) { - stDebug( - "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d " - "rsp", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); - return 0; - } else { - stDebug("s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id, - msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); + ASSERT(leftRsp >= 0); + + // all msg rsp already, continue + if (leftRsp == 0) { + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); + stDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status); + + // we need to re-try send dispatch msg to downstream tasks + int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList); + if (numOfFailed > 0) { + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed); + } + + pTask->msgInfo.retryCount++; + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", + pTask->id.idStr, pTask->msgInfo.retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); + + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + } else { // this message has been sent successfully, let's try next one. + pTask->msgInfo.retryCount = 0; + handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } - } else { - stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d", - id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code); - } - - // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state - if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); - ASSERT(pTask->info.fillHistory == 1); - - code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens - } - - // now ready for next data output - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - return TSDB_CODE_SUCCESS; - } - - pTask->msgInfo.retryCount = 0; - ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - - stDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status); - - // the input queue of the (down stream) task that receive the output data is full, - // so the TASK_INPUT_STATUS_BLOCKED is rsp - if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream - - double el = 0; - if (pTask->msgInfo.blockingTs == 0) { - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - } else { - el = (taosGetTimestampMs() - pTask->msgInfo.blockingTs) / 1000.0; - } - - int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); - stError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 - " wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref); - streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); - } else { - // this message has been sent successfully, let's try next one. - handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8d651c43a0..34d1f7a9c9 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -381,6 +381,7 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pUpstreamInfoList = NULL; } + pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList); taosMemoryFree(pTask->pTokenBucket); taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); @@ -410,6 +411,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; pTask->pMsgCb = pMsgCb; + pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); if (pTask->pTokenBucket == NULL) {