fix mem leak

This commit is contained in:
yihaoDeng 2024-09-02 17:35:10 +08:00
parent 93568271e1
commit 198486977e
6 changed files with 29 additions and 33 deletions

View File

@ -16,10 +16,10 @@
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "scheduler.h"
#include "trpc.h"
#include "tglobal.h"
#include "clientMonitor.h"
#include "scheduler.h"
#include "tglobal.h"
#include "trpc.h"
typedef struct {
union {
@ -244,11 +244,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
goto _return;
}
TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog,
(rsp->useDbRsp->db[0] == 'i') ?
TSDB_PERFORMANCE_SCHEMA_DB :
TSDB_INFORMATION_SCHEMA_DB,
rsp->useDbRsp->uid, vgInfo));
TSC_ERR_JRET(catalogUpdateDBVgInfo(
pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
rsp->useDbRsp->uid, vgInfo));
}
}
}
@ -556,7 +554,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
}
}
taosHashRelease(pAppHbMgr->activeInfo, pReq);
return TSDB_CODE_SUCCESS;
@ -609,8 +606,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
}
pInst->monitorParas = pRsp.monitorParas;
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId,
pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
if (rspNum) {
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
@ -1108,7 +1105,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
if (clientHbMgr.appHbHash) {
code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
if (TSDB_CODE_SUCCESS != code) {
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId,
tstrerror(code));
return code;
}
}
@ -1261,7 +1259,7 @@ int32_t hbGatherAppInfo(void) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) continue;
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) {
(void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
@ -1303,8 +1301,7 @@ static void *hbThreadFunc(void *param) {
return NULL;
}
if (sz > 1 && !clientHbMgr.appHbHash) {
clientHbMgr.appHbHash =
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
if (NULL == clientHbMgr.appHbHash) {
tscError("taosHashInit failed");
return NULL;
@ -1324,13 +1321,13 @@ static void *hbThreadFunc(void *param) {
continue;
}
SClientHbBatchReq *pReq = NULL;
int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq);
int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq);
if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
tFreeClientHbBatchReq(pReq);
continue;
}
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
if (tlen == -1) {
tFreeClientHbBatchReq(pReq);
break;
@ -1368,9 +1365,8 @@ static void *hbThreadFunc(void *param) {
pInfo->requestObjRefId = 0;
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo)) {
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
tscWarn("failed to async send msg to server");
}
tFreeClientHbBatchReq(pReq);
@ -1389,7 +1385,7 @@ static void *hbThreadFunc(void *param) {
}
static int32_t hbCreateThread() {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
TdThreadAttr thAttr;
TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
@ -1467,9 +1463,9 @@ int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMg
TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
goto _return;
code = TSDB_CODE_OUT_OF_MEMORY;
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
goto _return;
}
(*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));

View File

@ -1567,9 +1567,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
return code;
}
int64_t transporterId = 0;
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId,
body);
// int64_t transporterId = 0;
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
if (TSDB_CODE_SUCCESS != code) {
destroyTscObj(*pTscObj);
tscError("failed to send connect msg to server, code:%s", tstrerror(code));

View File

@ -530,8 +530,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob,
pMsgSendInfo->msgInfo.handle = NULL;
pMsgSendInfo->msgType = msgType;
int64_t transporterId = 0;
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, NULL, pMsgSendInfo);
pMsgSendInfo = NULL;
if (code) {
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));

View File

@ -586,7 +586,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
}
memcpy(*pDst, pSrc, sizeof(*pSrc));
(*pDst)->vgArray = NULL;
if (pSrc->vgHash) {
(*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
HASH_ENTRY_LOCK);

View File

@ -996,8 +996,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
pTask->lastMsgType = msgType;
}
int64_t transporterId = 0;
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx);
pMsgSendInfo = NULL;
if (code) {
SCH_ERR_JRET(code);

View File

@ -1496,9 +1496,10 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
int32_t code = taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
int32_t code =
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
if (code != 0) {
tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code));
tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code));
}
}
}
@ -2980,7 +2981,9 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
taosWUnLockLatch(&exh->latch);
tDebug("msg refId: %" PRId64 "", handle);
(void)transReleaseExHandle(transGetRefMgt(), handle);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
} else {