From 7f61dbc8cc8c250929d0f3462ebb27accf7ffa6f Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 4 Jun 2022 11:45:55 +0800 Subject: [PATCH] fix rpc connection fail issue --- source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/schJob.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 3 ++- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 592eec3592..c8fee1e532 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -365,7 +365,7 @@ int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *p int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SSchResInfo *pRes); int32_t schFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob); -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx); +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 6e1319d5f4..3e23c395c9 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -396,8 +396,8 @@ int32_t schUpdateTaskExecNode(SSchTask *pTask, void *handle, int32_t execIdx) { return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, void *handle, int32_t execIdx) { - if (msgType == TDMT_SCH_LINK_BROKEN) { +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx) { + if (dropExecNode) { SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx)); } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 88caf7438e..b336ce8c76 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -372,7 +372,8 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in goto _return; } - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, msgType, pMsg->handle, pParam->execIdx)); + bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || rspCode == TSDB_CODE_RPC_NETWORK_UNAVAIL); + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx)); SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode));