From 16888801c75709de2a9c1a9c07a7c8bb05e0091c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 21 Aug 2024 16:26:32 +0800 Subject: [PATCH] fix mem leak --- source/client/src/clientImpl.c | 63 +++++++++++++++++----------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 563d70ab08..6b8bf88030 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -15,8 +15,8 @@ #include "cJSON.h" #include "clientInt.h" -#include "clientMonitor.h" #include "clientLog.h" +#include "clientMonitor.h" #include "command.h" #include "scheduler.h" #include "tdatablock.h" @@ -28,7 +28,7 @@ #include "tref.h" #include "tsched.h" #include "tversion.h" -static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); +static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo); static bool stringLengthCheck(const char* str, size_t maxsize) { @@ -68,15 +68,13 @@ bool chkRequestKilled(void* param) { return killed; } -void cleanupAppInfo() { - taosHashCleanup(appInfo.pInstMap); -} +void cleanupAppInfo() { taosHashCleanup(appInfo.pInstMap); } static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj); int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, - uint16_t port, int connType, STscObj** pObj) { + uint16_t port, int connType, STscObj** pObj) { TSC_ERR_RET(taos_init()); if (!validateUserName(user)) { TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH); @@ -126,7 +124,7 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass } SAppInstInfo** pInst = NULL; - int32_t code = taosThreadMutexLock(&appInfo.mutex); + int32_t code = taosThreadMutexLock(&appInfo.mutex); if (TSDB_CODE_SUCCESS != code) { tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code))); TSC_ERR_RET(code); @@ -187,14 +185,14 @@ _return: return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj); } -//SAppInstInfo* getAppInstInfo(const char* clusterKey) { -// SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey)); -// if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) { -// return *ppAppInstInfo; -// } else { -// return NULL; -// } -//} +// SAppInstInfo* getAppInstInfo(const char* clusterKey) { +// SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey)); +// if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) { +// return *ppAppInstInfo; +// } else { +// return NULL; +// } +// } void freeQueryParam(SSyncQueryParam* param) { if (param == NULL) return; @@ -432,7 +430,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) { return TSDB_CODE_SUCCESS; } -int32_t qnodeRequired(SRequestObj* pRequest, bool *required) { +int32_t qnodeRequired(SRequestObj* pRequest, bool* required) { if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) { *required = false; return TSDB_CODE_SUCCESS; @@ -554,7 +552,7 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr if (NULL == nodeList) { return terrno; } - char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client"; + char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client"; int32_t dbNum = taosArrayGetSize(pDbVgList); for (int32_t i = 0; i < dbNum; ++i) { @@ -568,7 +566,7 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr } for (int32_t j = 0; j < vgNum; ++j) { - SVgroupInfo* pInfo = taosArrayGet(pVg, j); + SVgroupInfo* pInfo = taosArrayGet(pVg, j); if (NULL == pInfo) { taosArrayDestroy(nodeList); return TSDB_CODE_OUT_OF_RANGE; @@ -1022,7 +1020,7 @@ void returnToUser(SRequestObj* pRequest) { } } -static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) { +static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) { int64_t lastTs = 0; TAOS_FIELD* pResFields = taos_fetch_fields(pRes); int32_t numOfFields = taos_num_fields(pRes); @@ -1032,7 +1030,7 @@ static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock* return code; } - for(int32_t i = 0; i < numOfFields; ++i) { + for (int32_t i = 0; i < numOfFields; ++i) { SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1); code = blockDataAppendColInfo(*pBlock, &colInfoData); if (TSDB_CODE_SUCCESS != code) { @@ -1054,7 +1052,7 @@ static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock* lastTs = ts; } - for(int32_t j = 0; j < numOfFields; ++j) { + for (int32_t j = 0; j < numOfFields; ++j) { SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j); code = colDataSetVal(pColInfoData, i, pRow[j], false); if (TSDB_CODE_SUCCESS != code) { @@ -1069,7 +1067,7 @@ static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock* (*pBlock)->info.window.ekey = lastTs; (*pBlock)->info.rows = numOfRows; - tscDebug("lastKey:%"PRId64" numOfRows:%d from all vgroups", lastTs, numOfRows); + tscDebug("lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows); return TSDB_CODE_SUCCESS; } @@ -1215,7 +1213,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue } break; case QUERY_EXEC_MODE_SCHEDULE: { - SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); + SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); if (NULL == pMnodeList) { code = terrno; break; @@ -1533,7 +1531,7 @@ int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* p } int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, - SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) { + SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) { *pTscObj = NULL; int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj); if (TSDB_CODE_SUCCESS != code) { @@ -1560,7 +1558,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta } int64_t transporterId = 0; - code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body); + code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, + body); if (TSDB_CODE_SUCCESS != code) { destroyTscObj(*pTscObj); tscError("failed to send connect msg to server, code:%s", tstrerror(code)); @@ -1794,6 +1793,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { pMsg->code = TSDB_CODE_OUT_OF_MEMORY; rpcFreeCont(pMsg->pCont); destroySendMsgInfo(pMsg->info.ahandle); + taosMemoryFree(tEpSet); return; } arg->msg = *pMsg; @@ -1820,7 +1820,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons } STscObj* pObj = NULL; - int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj); + int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj); if (TSDB_CODE_SUCCESS == code) { int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t)); if (NULL == rid) { @@ -1890,7 +1890,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) } SReqResultInfo* pResInfo = &pRequest->body.resInfo; - SSchedulerReq req = { .syncReq = true, .pFetchRes = (void**)&pResInfo->pData }; + SSchedulerReq req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData}; pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req); if (pRequest->code != TSDB_CODE_SUCCESS) { @@ -2033,7 +2033,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) { } static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) { - char* p = (char*)pResultInfo->pData; + char* p = (char*)pResultInfo->pData; int32_t blockVersion = *(int32_t*)p; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column @@ -2298,7 +2298,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - if(blockVersion == BLOCK_VERSION_1){ + if (blockVersion == BLOCK_VERSION_1) { colLength[i] = htonl(colLength[i]); } if (colLength[i] >= dataLen) { @@ -2733,7 +2733,8 @@ void syncQueryFn(void* param, void* res, int32_t code) { (void)tsem_post(&pParam->sem); } -void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source) { +void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, + int8_t source) { if (sql == NULL || NULL == fp) { terrno = TSDB_CODE_INVALID_PARA; if (fp) { @@ -2932,7 +2933,7 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req); if (TSDB_CODE_SUCCESS != code) { tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId); - pRequest->body.fetchFp(param, pRequest, code); + pRequest->body.fetchFp(param, pRequest, code); } }