fix mem leak

This commit is contained in:
yihaoDeng 2024-08-21 16:26:32 +08:00
parent 4e2c93f262
commit 16888801c7
1 changed files with 32 additions and 31 deletions

View File

@ -15,8 +15,8 @@
#include "cJSON.h"
#include "clientInt.h"
#include "clientMonitor.h"
#include "clientLog.h"
#include "clientMonitor.h"
#include "command.h"
#include "scheduler.h"
#include "tdatablock.h"
@ -28,7 +28,7 @@
#include "tref.h"
#include "tsched.h"
#include "tversion.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
static bool stringLengthCheck(const char* str, size_t maxsize) {
@ -68,15 +68,13 @@ bool chkRequestKilled(void* param) {
return killed;
}
void cleanupAppInfo() {
taosHashCleanup(appInfo.pInstMap);
}
void cleanupAppInfo() { taosHashCleanup(appInfo.pInstMap); }
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port, int connType, STscObj** pObj) {
uint16_t port, int connType, STscObj** pObj) {
TSC_ERR_RET(taos_init());
if (!validateUserName(user)) {
TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
@ -126,7 +124,7 @@ int32_t taos_connect_internal(const char* ip, const char* user, const char* pass
}
SAppInstInfo** pInst = NULL;
int32_t code = taosThreadMutexLock(&appInfo.mutex);
int32_t code = taosThreadMutexLock(&appInfo.mutex);
if (TSDB_CODE_SUCCESS != code) {
tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
TSC_ERR_RET(code);
@ -187,14 +185,14 @@ _return:
return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
}
//SAppInstInfo* getAppInstInfo(const char* clusterKey) {
// SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
// if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
// return *ppAppInstInfo;
// } else {
// return NULL;
// }
//}
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
// SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
// if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
// return *ppAppInstInfo;
// } else {
// return NULL;
// }
// }
void freeQueryParam(SSyncQueryParam* param) {
if (param == NULL) return;
@ -432,7 +430,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
return TSDB_CODE_SUCCESS;
}
int32_t qnodeRequired(SRequestObj* pRequest, bool *required) {
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
*required = false;
return TSDB_CODE_SUCCESS;
@ -554,7 +552,7 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
if (NULL == nodeList) {
return terrno;
}
char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
int32_t dbNum = taosArrayGetSize(pDbVgList);
for (int32_t i = 0; i < dbNum; ++i) {
@ -568,7 +566,7 @@ int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArr
}
for (int32_t j = 0; j < vgNum; ++j) {
SVgroupInfo* pInfo = taosArrayGet(pVg, j);
SVgroupInfo* pInfo = taosArrayGet(pVg, j);
if (NULL == pInfo) {
taosArrayDestroy(nodeList);
return TSDB_CODE_OUT_OF_RANGE;
@ -1022,7 +1020,7 @@ void returnToUser(SRequestObj* pRequest) {
}
}
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) {
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
int64_t lastTs = 0;
TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
@ -1032,7 +1030,7 @@ static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock*
return code;
}
for(int32_t i = 0; i < numOfFields; ++i) {
for (int32_t i = 0; i < numOfFields; ++i) {
SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
code = blockDataAppendColInfo(*pBlock, &colInfoData);
if (TSDB_CODE_SUCCESS != code) {
@ -1054,7 +1052,7 @@ static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock*
lastTs = ts;
}
for(int32_t j = 0; j < numOfFields; ++j) {
for (int32_t j = 0; j < numOfFields; ++j) {
SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
code = colDataSetVal(pColInfoData, i, pRow[j], false);
if (TSDB_CODE_SUCCESS != code) {
@ -1069,7 +1067,7 @@ static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock*
(*pBlock)->info.window.ekey = lastTs;
(*pBlock)->info.rows = numOfRows;
tscDebug("lastKey:%"PRId64" numOfRows:%d from all vgroups", lastTs, numOfRows);
tscDebug("lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
return TSDB_CODE_SUCCESS;
}
@ -1215,7 +1213,7 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
}
break;
case QUERY_EXEC_MODE_SCHEDULE: {
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
if (NULL == pMnodeList) {
code = terrno;
break;
@ -1533,7 +1531,7 @@ int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* p
}
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
*pTscObj = NULL;
int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
if (TSDB_CODE_SUCCESS != code) {
@ -1560,7 +1558,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
}
int64_t transporterId = 0;
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body);
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId,
body);
if (TSDB_CODE_SUCCESS != code) {
destroyTscObj(*pTscObj);
tscError("failed to send connect msg to server, code:%s", tstrerror(code));
@ -1794,6 +1793,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pMsg->info.ahandle);
taosMemoryFree(tEpSet);
return;
}
arg->msg = *pMsg;
@ -1820,7 +1820,7 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
}
STscObj* pObj = NULL;
int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
if (TSDB_CODE_SUCCESS == code) {
int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
if (NULL == rid) {
@ -1890,7 +1890,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
}
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
SSchedulerReq req = { .syncReq = true, .pFetchRes = (void**)&pResInfo->pData };
SSchedulerReq req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
if (pRequest->code != TSDB_CODE_SUCCESS) {
@ -2033,7 +2033,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
}
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
char* p = (char*)pResultInfo->pData;
char* p = (char*)pResultInfo->pData;
int32_t blockVersion = *(int32_t*)p;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
@ -2298,7 +2298,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) {
if(blockVersion == BLOCK_VERSION_1){
if (blockVersion == BLOCK_VERSION_1) {
colLength[i] = htonl(colLength[i]);
}
if (colLength[i] >= dataLen) {
@ -2733,7 +2733,8 @@ void syncQueryFn(void* param, void* res, int32_t code) {
(void)tsem_post(&pParam->sem);
}
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source) {
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
int8_t source) {
if (sql == NULL || NULL == fp) {
terrno = TSDB_CODE_INVALID_PARA;
if (fp) {
@ -2932,7 +2933,7 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
if (TSDB_CODE_SUCCESS != code) {
tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
pRequest->body.fetchFp(param, pRequest, code);
pRequest->body.fetchFp(param, pRequest, code);
}
}