fix: fix crash issue

This commit is contained in:
dapan1121 2022-07-21 09:10:18 +08:00
parent 34fbc93e03
commit db37fef898
1 changed files with 15 additions and 7 deletions

View File

@ -509,7 +509,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == msgSendInfo) {
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
msgSendInfo->paramFreeFp = taosMemoryFree;
@ -535,7 +535,11 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
_return:
if (msgSendInfo) {
destroySendMsgInfo(msgSendInfo);
}
taosMemoryFree(msg);
SCH_RET(code);
}
@ -555,7 +559,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
*fp = schHandleCallback;
break;
case TDMT_SCH_DROP_TASK:
//*fp = schHandleDropCallback;
*fp = schHandleDropCallback;
break;
case TDMT_SCH_QUERY_HEARTBEAT:
*fp = schHandleHbCallback;
@ -843,6 +847,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
pMsgSendInfo = NULL;
if (code) {
SCH_ERR_JRET(code);
}
@ -919,7 +924,9 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
addr.epSet.numOfEps = 1;
memcpy(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
SCH_ERR_JRET(schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx));
code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
msg = NULL;
SCH_ERR_JRET(code);
return TSDB_CODE_SUCCESS;
@ -1087,8 +1094,9 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
SCH_ERR_JRET(
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL)));
schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) {
SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId));