diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 06d99ae856..a4d89dcdcc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -207,7 +207,6 @@ typedef struct { typedef struct { int32_t nodeId; - SEpSet epset; } SDownstreamTaskEpset; typedef enum { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index a625ca81c9..f3d3a85211 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1430,7 +1430,7 @@ int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, cons return TSDB_CODE_INVALID_PARA; } - pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)), + pInfo->pUpdateNodeList = taosArrayInit(4, sizeof(SNodeUpdateInfo)); pInfo->pDBMap = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index e4f921f04a..fb45727f92 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -200,6 +200,7 @@ void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo); +int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo); EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 64b19e4ed9..118cb1cfb6 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -21,7 +21,6 @@ #define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec static void processDownstreamReadyRsp(SStreamTask* pTask); -static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); static void rspMonitorFn(void* param, void* tmrId); static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); @@ -226,13 +225,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); - code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); + code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else { stError( "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "downstream again, nodeUpdate needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + code = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } streamMetaAddFailedTaskSelf(pTask, now); @@ -373,11 +372,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { } } -int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { +int32_t streamTaskAddIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t vgId = pTask->pMeta->vgId; int32_t code = 0; - ; - bool existed = false; + bool existed = false; streamMutexLock(&pTask->lock); @@ -675,8 +673,8 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, *pTaskId, &p); if (p != NULL) { - code = addIntoNodeUpdateList(pTask, p->vgId); - stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", + code = streamTaskAddIntoNodeUpdateList(pTask, p->vgId); + stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpdate list", id, vgId, p->taskId, p->vgId); } } @@ -717,7 +715,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution -// of restart in timer thread will result in a dead lock. +// of restart in timer thread will result in a deadlock. int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ad9be63674..fa16cace25 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1552,7 +1552,6 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t int32_t* pFailed, const char* id) { int32_t numOfRsp = 0; int32_t numOfFailed = 0; - bool allRsp = false; int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); @@ -1639,6 +1638,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t notRsp = 0; int32_t numOfFailed = 0; bool triggerDispatchRsp = false; + bool addFailure = false; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; int64_t tmpCheckpointId = -1; int32_t tmpTranId = -1; @@ -1698,6 +1698,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } else { if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { // todo handle the role-changed during checkpoint generation, add test case + addFailure = true; stError( "s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, downstream may become follower or " "restart already, treat it as success", @@ -1745,6 +1746,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code)); } + if (addFailure) { // add failure downstream node id, and start the nodeEp update procedure + // ignore the return error and continue + int32_t unused = streamTaskAddIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + } + // all msg rsp already, continue // we need to re-try send dispatch msg to downstream tasks if (allRsp && (numOfFailed == 0)) { @@ -1866,6 +1872,11 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S } } +#if 0 + // inject errors, and always refuse the upstream dispatch msg and trigger the task nodeEpset update trans. + status = TASK_INPUT_STATUS__REFUSED; +#endif + { // do send response with the input status int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);