diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 29e68ce4e8..7151c4a026 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -42,6 +42,7 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "qworker.h" static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; @@ -375,6 +376,7 @@ void mndPreClose(SMnode *pMnode) { void mndClose(SMnode *pMnode) { if (pMnode != NULL) { mDebug("start to close mnode"); + qWorkerDestroy((void **)&pMnode->pQuery); mndCleanupSteps(pMnode, -1); taosMemoryFreeClear(pMnode->path); taosMemoryFreeClear(pMnode); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 3c6c02ede6..56af7ee1e0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -132,7 +132,7 @@ typedef struct SQWTaskCtx { int8_t taskType; int8_t explain; int8_t needFetch; - int32_t queryType; + int32_t msgType; int32_t fetchType; int32_t execId; @@ -380,7 +380,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwAddTaskCtx(QW_FPARAMS_DEF); -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx); +int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 7906617a3f..4e174018f4 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -147,7 +147,7 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int return TSDB_CODE_SUCCESS; } -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { +int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { if (gQWDebug.tmp) { if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { SEpSet epSet = {0}; @@ -162,16 +162,25 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); + *rsped = true; return TSDB_CODE_SUCCESS; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); + *rsped = true; + return TSDB_CODE_SUCCESS; + } + + if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 3)) { + qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); + *rsped = true; return TSDB_CODE_SUCCESS; } } + *rsped = false; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c763645a07..7e12a3a424 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -428,7 +428,14 @@ _return: if (QW_PHASE_POST_QUERY == phase && ctx) { ctx->queryRsped = true; - qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + + bool rsped = false; + SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; + qwDbgResponseRedirect(&qwMsg, ctx, &rsped); + if (!rsped) { + qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + } + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); } @@ -476,8 +483,6 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); - qwDbgResponseRedirect(qwMsg, ctx); - _return: if (ctx) { @@ -505,7 +510,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { ctx->taskType = qwMsg->msgInfo.taskType; ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; - ctx->queryType = qwMsg->msgType; + ctx->msgType = qwMsg->msgType; //QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -650,7 +655,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); - ctx->queryType = qwMsg->msgType; + ctx->msgType = qwMsg->msgType; SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); @@ -702,7 +707,11 @@ _return: } if (code || rsp) { - qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + bool rsped = false; + qwDbgResponseRedirect(qwMsg, ctx, &rsped); + if (!rsped) { + qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + } QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a46b293965..bfba81b118 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -138,7 +138,17 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 return TSDB_CODE_SUCCESS; } + if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it + SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + return TSDB_CODE_SUCCESS; + } + SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); + if (NULL == nodeInfo) { // ignore it + SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + return TSDB_CODE_SUCCESS; + } + nodeInfo->handle = handle; SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId);