From 85ec00768729bb3ce151aa6ca42aa42aef1216b1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 17 Dec 2024 11:39:49 +0800 Subject: [PATCH] fix: task release prior to response issue --- source/libs/qworker/src/qworker.c | 1 + source/libs/scheduler/src/schRemote.c | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index fa422b1de4..641fa03f7a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -569,6 +569,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu if (TSDB_CODE_SUCCESS != input->code) { QW_TASK_ELOG("task already failed at phase %s, code:0x%x", qwPhaseStr(phase), input->code); + ctx->ctrlConnInfo.handle = NULL; (void)qwDropTask(QW_FPARAMS()); QW_ERR_JRET(input->code); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index fd255f53cf..d15ac7a791 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1361,14 +1361,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, taosMemoryFree(msg); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); } else { + if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { + SCH_ERR_JRET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); + } + SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)); msg = NULL; SCH_ERR_JRET(code); - - if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { - SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); - } } return TSDB_CODE_SUCCESS;