more
This commit is contained in:
parent
72a5ceeed8
commit
d2ce48f501
|
@ -512,7 +512,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
msgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
msgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
|
||||
|
||||
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
|
||||
|
@ -540,7 +540,7 @@ _return:
|
|||
}
|
||||
|
||||
taosMemoryFree(msg);
|
||||
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
@ -680,7 +680,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
|||
param->pTrans = pJob->conn.pTrans;
|
||||
|
||||
pMsgSendInfo->param = param;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->fp = fp;
|
||||
|
||||
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
|
||||
|
@ -800,7 +800,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
|
|||
pDst->param = NULL;
|
||||
|
||||
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
|
||||
pDst->paramFreeFp = taosMemoryFree;
|
||||
pDst->paramFreeFp = taosMemoryFree;
|
||||
|
||||
*dst = pDst;
|
||||
|
||||
|
@ -1093,14 +1093,29 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
break;
|
||||
}
|
||||
|
||||
#if 1
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
|
||||
}
|
||||
#else
|
||||
if (TDMT_VND_SUBMIT != msgType) {
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
|
||||
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));
|
||||
}
|
||||
} else {
|
||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
}
|
||||
#endif
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
Loading…
Reference in New Issue