From 0048bd8cb634b027d1c504196dcd901791cc350b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 18:09:27 +0800 Subject: [PATCH] fix: fix redirect issue --- source/client/src/clientImpl.c | 2 +- source/libs/qworker/src/qwDbg.c | 4 ++-- source/libs/qworker/src/qworker.c | 6 ++---- source/libs/scheduler/inc/schInt.h | 3 ++- source/libs/scheduler/src/schRemote.c | 2 +- source/libs/scheduler/src/schTask.c | 6 ++++-- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5e620d1060..90d16d17a5 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1276,7 +1276,7 @@ int32_t doProcessMsgFromServer(void* param) { assert(pMsg->info.ahandle != NULL); STscObj* pTscObj = NULL; - tscDebug("processMsgFromServer message: %s, code: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code)); + tscDebug("processMsgFromServer handle %p, message: %s, code: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code)); if (pSendInfo->requestObjRefId != 0) { SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index cd54c5e5f9..7906617a3f 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -188,8 +188,8 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) { return; } - //FETCH OR QUERY USE DIFFERENT CONNINFO - qwBuildAndSendErrorRsp(msgType + 1, ctx->dataConnInfo, TSDB_CODE_RPC_BROKEN_LINK); + SRpcHandleInfo *pConn = ((msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); + qwBuildAndSendErrorRsp(msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); qwDropTask(QW_FPARAMS()); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a671585014..c763645a07 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -426,7 +426,7 @@ _return: qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); } - if (QW_PHASE_POST_QUERY == phase) { + if (QW_PHASE_POST_QUERY == phase && ctx) { ctx->queryRsped = true; qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); @@ -730,11 +730,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); - } else if (QW_GET_PHASE(ctx) > 0) { + } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropped = true; - } else { - // task not started } if (!dropped) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index bc0270635d..4979a41f17 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -58,7 +58,7 @@ typedef enum { #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000 -#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA +#define SCH_MAX_CANDIDATE_EP_NUM (TSDB_MAX_REPLICA + 100) @@ -211,6 +211,7 @@ typedef struct SSchTask { int32_t maxExecTimes; // task max exec times int32_t maxRetryTimes; // task max retry times int32_t retryTimes; // task retry times + bool waitRetry; // wait for retry int32_t execId; // task current execute index SSchLevel *level; // level SRWLatch planLock; // task update plan lock diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index ec07ee85fd..41d9f46a87 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1010,7 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); persistHandle = true; - //SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); + SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); break; } case TDMT_SCH_FETCH: diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9483ecd6eb..a46b293965 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -125,8 +125,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ SCH_TASK_DLOG("execId %d removed from execNodeList", execId); } - if (execId != pTask->execId) { // ignore it - SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId); + if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it + SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -335,6 +335,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 return TSDB_CODE_SUCCESS; } + pTask->waitRetry = true; schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); @@ -790,6 +791,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); pTask->execId++; pTask->retryTimes++; + pTask->waitRetry = false; SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes);