From aa12038fc49fe35eb34e851c92591dc9a4a6e455 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 10 Dec 2024 17:58:53 +0800 Subject: [PATCH 1/3] fix(stream): record the failure of dispatch msg, and set the update node id. --- include/libs/stream/tstream.h | 1 - source/libs/stream/inc/streamInt.h | 1 + source/libs/stream/src/streamCheckStatus.c | 16 +++++++--------- source/libs/stream/src/streamDispatch.c | 7 ++++++- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 53b8e0e0b9..295aa770a9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -207,7 +207,6 @@ typedef struct { typedef struct { int32_t nodeId; - SEpSet epset; } SDownstreamTaskEpset; typedef enum { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 8f9e4a311c..8f68116079 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -201,6 +201,7 @@ void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo); +int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo); EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 64b19e4ed9..118cb1cfb6 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -21,7 +21,6 @@ #define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec static void processDownstreamReadyRsp(SStreamTask* pTask); -static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); static void rspMonitorFn(void* param, void* tmrId); static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); @@ -226,13 +225,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); - code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); + code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else { stError( "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "downstream again, nodeUpdate needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } streamMetaAddFailedTaskSelf(pTask, now); @@ -373,11 +372,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { } } -int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { +int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t vgId = pTask->pMeta->vgId; int32_t code = 0; - ; - bool existed = false; + bool existed = false; streamMutexLock(&pTask->lock); @@ -675,8 +673,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, *pTaskId, &p); if (p != NULL) { - code = addIntoNodeUpdateList(pTask, p->vgId); - stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", + code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId); + stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list", id, vgId, p->taskId, p->vgId); } } @@ -717,7 +715,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution -// of restart in timer thread will result in a dead lock. +// of restart in timer thread will result in a deadlock. int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b4fcf1edc9..c1e4850bfe 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1552,7 +1552,6 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t int32_t* pFailed, const char* id) { int32_t numOfRsp = 0; int32_t numOfFailed = 0; - bool allRsp = false; int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); @@ -1639,6 +1638,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t notRsp = 0; int32_t numOfFailed = 0; bool triggerDispatchRsp = false; + bool addFailure = false; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int64_t tmpCheckpointId = -1; int32_t tmpTranId = -1; @@ -1698,6 +1698,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } else { if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { // todo handle the role-changed during checkpoint generation, add test case + addFailure = true; stError( "s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or " "restart already, treat it as success", @@ -1745,6 +1746,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); } + if (addFailure) { // add failure downstream node id, and start the nodeEp update procedure + streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + } + // all msg rsp already, continue // we need to re-try send dispatch msg to downstream tasks if (allRsp && (numOfFailed == 0)) { From aa187f1ee3ba93838a5f4783f9b0e00c57709a73 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 12 Dec 2024 09:35:35 +0800 Subject: [PATCH 2/3] fix(stream): check return value. --- source/libs/stream/src/streamDispatch.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c1e4850bfe..968744a0c5 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1747,7 +1747,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } if (addFailure) { // add failure downstream node id, and start the nodeEp update procedure - streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + // ignore the return error and continue + int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } // all msg rsp already, continue From 779c1bd8add7b18a5df9508b1b11d5ba060dc4b9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 16 Dec 2024 13:53:13 +0800 Subject: [PATCH 3/3] refactor(stream): inject the reject dispatch error in stream to test the effect of fix. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 4 ++-- source/libs/stream/src/streamDispatch.c | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index bb666eb6dd..941e7e53a2 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1377,8 +1377,8 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons return TSDB_CODE_INVALID_PARA; } - pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), - pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)); + pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { mndDestroyVgroupChangeInfo(pInfo); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 968744a0c5..0fc007a1fd 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1872,6 +1872,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } } +#if 0 + // inject errors, and always refuse the upstream dispatch msg and trigger the task nodeEpset update trans. + status = TASK_INPUT_STATUS__REFUSED; +#endif + { // do send response with the input status int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);