fix: task release prior to response issue

This commit is contained in:
dapan1121 2024-12-17 11:39:49 +08:00
parent 2464f1ba68
commit 85ec007687
2 changed files with 5 additions and 4 deletions

View File

@ -569,6 +569,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
if (TSDB_CODE_SUCCESS != input->code) {
QW_TASK_ELOG("task already failed at phase %s, code:0x%x", qwPhaseStr(phase), input->code);
ctx->ctrlConnInfo.handle = NULL;
(void)qwDropTask(QW_FPARAMS());
QW_ERR_JRET(input->code);

View File

@ -1361,14 +1361,14 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
taosMemoryFree(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
} else {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_JRET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
}
}
return TSDB_CODE_SUCCESS;