handle mem leak failure

This commit is contained in:
yihaoDeng 2024-11-01 17:53:06 +08:00
parent daa8341a3e
commit 17077d1a32
1 changed files with 25 additions and 19 deletions

View File

@ -1770,19 +1770,15 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
} }
} }
int32_t doProcessMsgFromServer(void* param) { int32_t doProcessMsFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
AsyncArg* arg = (AsyncArg*)param;
SRpcMsg* pMsg = &arg->msg;
SEpSet* pEpSet = arg->pEpset;
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
if (pMsg->info.ahandle == NULL) { if (pMsg->info.ahandle == NULL) {
tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL"); tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
taosMemoryFree(arg->pEpset);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosMemoryFree(arg); taosMemoryFree(pEpSet);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
STscObj* pTscObj = NULL; STscObj* pTscObj = NULL;
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
@ -1802,10 +1798,9 @@ int32_t doProcessMsgFromServer(void* param) {
if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) { if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
tscError("doProcessMsgFromServer taosReleaseRef failed"); tscError("doProcessMsgFromServer taosReleaseRef failed");
} }
taosMemoryFree(arg->pEpset);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosMemoryFree(pEpSet);
destroySendMsgInfo(pSendInfo); destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
} }
pTscObj = pRequest->pTscObj; pTscObj = pRequest->pTscObj;
@ -1844,20 +1839,24 @@ int32_t doProcessMsgFromServer(void* param) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo); destroySendMsgInfo(pSendInfo);
taosMemoryFree(arg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doProcessMsgFromServer(void* param) {
AsyncArg* arg = (AsyncArg*)param;
int32_t code = doProcessMsFromServerImpl(&arg->msg, arg->pEpset);
taosMemoryFree(arg);
return code;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int32_t code = 0;
SEpSet* tEpSet = NULL; SEpSet* tEpSet = NULL;
if (pEpSet != NULL) { if (pEpSet != NULL) {
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet)); tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
if (NULL == tEpSet) { if (NULL == tEpSet) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
rpcFreeCont(pMsg->pCont); pMsg->code = terrno;
destroySendMsgInfo(pMsg->info.ahandle); goto _exit;
return;
} }
(void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet)); (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
} }
@ -1879,12 +1878,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg)); AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
if (NULL == arg) { if (NULL == arg) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY; code = terrno;
taosMemoryFree(tEpSet); pMsg->code = code;
rpcFreeCont(pMsg->pCont); goto _exit;
destroySendMsgInfo(pMsg->info.ahandle);
return; return;
} }
arg->msg = *pMsg; arg->msg = *pMsg;
arg->pEpset = tEpSet; arg->pEpset = tEpSet;
@ -1895,6 +1894,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
destroySendMsgInfo(pMsg->info.ahandle); destroySendMsgInfo(pMsg->info.ahandle);
taosMemoryFree(arg); taosMemoryFree(arg);
} }
return;
_exit:
tscError("failed to sched msg to tsc since %s", tstrerror(code));
code = doProcessMsFromServerImpl(pMsg, tEpSet);
if (code != 0) {
tscError("failed to sched msg to tsc, tsc ready quit");
}
} }
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) { TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {