From 02d1583725496bce0937ff305a5d21b431830155 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Sep 2020 11:09:16 +0800 Subject: [PATCH] [td-1523] fix memory leaks and refactor some functions. --- src/client/inc/tscUtil.h | 2 ++ src/client/src/tscAsync.c | 5 +---- src/client/src/tscPrepare.c | 6 ++---- src/client/src/tscServer.c | 23 ++++++----------------- src/client/src/tscSql.c | 7 ++----- src/client/src/tscStream.c | 4 +--- src/client/src/tscSub.c | 4 +--- src/client/src/tscSystem.c | 12 +++--------- src/client/src/tscUtil.c | 24 ++++++++++++++---------- src/util/src/tcache.c | 2 +- 10 files changed, 33 insertions(+), 56 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 79a792ab65..0323434a99 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -258,6 +258,8 @@ void tscDoQuery(SSqlObj* pSql); */ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd); +void registerSqlObj(SSqlObj* pSql); + SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql); void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex); diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 09610575f6..c5d622e245 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -51,10 +51,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const pSql->fp = fp; pSql->fetchFp = fp; - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - - T_REF_INC(pSql->pTscObj); + registerSqlObj(pSql); pSql->sqlstr = calloc(1, sqlLen + 1); if (pSql->sqlstr == NULL) { diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index c4ca6793ff..cdbd8685df 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -545,10 +545,8 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pSql->cmd.numOfParams = 0; pSql->cmd.batchSize = 0; - - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - T_REF_INC(pSql->pTscObj); + + registerSqlObj(pSql); int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8eaa406bce..b26abf2145 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1987,15 +1987,11 @@ static void createHBObj(STscObj* pObj) { pSql->param = pObj; pSql->pTscObj = pObj; pSql->signature = pSql; + + registerSqlObj(pSql); + tscDebug("%p HB is allocated, pObj:%p", pSql, pObj); + pObj->pHb = pSql; - - tscAddSubqueryInfo(&pObj->pHb->cmd); - - int64_t ad = (int64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &ad, sizeof(int64_t), &pSql, sizeof(int64_t), 2 * 60 * 1000); - T_REF_INC(pObj); - - tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); } int tscProcessConnectRsp(SSqlObj *pSql) { @@ -2170,11 +2166,7 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf pNew->signature = pNew; pNew->cmd.command = TSDB_SQL_META; - T_REF_INC(pNew->pTscObj); - - // TODO add test case on x86 platform - uint64_t adr = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000); + registerSqlObj(pNew); tscAddSubqueryInfo(&pNew->cmd); @@ -2301,10 +2293,7 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { } pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; - T_REF_INC(pNew->pTscObj); - - uint64_t p = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); + registerSqlObj(pNew); tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables); pNew->fp = tscTableMetaCallBack; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index e1a07fdcfe..347e3cb508 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -156,10 +156,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con *taos = pObj; } - T_REF_INC(pSql->pTscObj); - - uint64_t key = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); + registerSqlObj(pSql); tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg); return pSql; @@ -270,7 +267,7 @@ void taos_close(TAOS *taos) { pHb->pRpcCtx = NULL; } - tscDebug("%p, HB is freed", pHb); + tscDebug("%p HB is freed", pHb); taos_free_result(pHb); } diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 35cd09a033..81b8cf7359 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -510,9 +510,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { return; } - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - T_REF_INC(pSql->pTscObj); + registerSqlObj(pSql); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 760c5f5a51..7913e0fa03 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -152,9 +152,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* goto fail; } - uint64_t handle = (uint64_t) pSql; - pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000); - T_REF_INC(pSql->pTscObj); + registerSqlObj(pSql); code = tsParseSql(pSql, false); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 82ce1d3679..620fe13a9f 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -122,11 +122,8 @@ void taos_init_imp(void) { tscInitMsgsFp(); int queueSize = tsMaxConnections*2; - if (tscEmbedded == 0) { - tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 2.0); - } else { - tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / 4.0); - } + double factor = (tscEmbedded == 0)? 2.0:4.0; + tscNumOfThreads = (int)(tsNumOfCores * tsNumOfThreadsPerCore / factor); if (tscNumOfThreads < 2) tscNumOfThreads = 2; @@ -140,11 +137,8 @@ void taos_init_imp(void) { if(0 == tscEmbedded){ taosTmrReset(tscCheckDiskUsage, 10, NULL, tscTmr, &tscCheckDiskUsageTmr); } - - int64_t refreshTime = tsTableMetaKeepTimer; - refreshTime = refreshTime > 10 ? 10 : refreshTime; - refreshTime = refreshTime < 10 ? 10 : refreshTime; + int64_t refreshTime = 10; // 10 seconds by default if (tscMetaCache == NULL) { tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta"); tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime/2, false, tscFreeSqlObjInCache, "sqlObj"); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7f445344b1..5d439927e2 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -424,12 +424,12 @@ void tscFreeSqlObj(SSqlObj* pSql) { free(pSql); - tscDebug("%p free sqlObj completed", p); - int32_t ref = T_REF_DEC(pTscObj); assert(ref >= 0); + tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref); if (ref == 0) { + tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj); tscCloseTscObj(pTscObj); } } @@ -1783,6 +1783,16 @@ void tscResetForNextRetrieve(SSqlRes* pRes) { pRes->numOfRows = 0; } +void registerSqlObj(SSqlObj* pSql) { + int64_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec + + int32_t ref = T_REF_INC(pSql->pTscObj); + tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref); + + uint64_t p = (uint64_t) pSql; + pSql->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &p, sizeof(uint64_t), DEFAULT_LIFE_TIME); +} + SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) { SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj)); if (pNew == NULL) { @@ -1822,10 +1832,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0); tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL); - - T_REF_INC(pNew->pTscObj); - uint64_t p = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000); + registerSqlObj(pNew); return pNew; } @@ -2063,10 +2070,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex); } - T_REF_INC(pNew->pTscObj); - - uint64_t p = (uint64_t) pNew; - pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10); + registerSqlObj(pNew); return pNew; _error: diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 49b9996cf4..2637699adb 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -97,7 +97,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo int32_t size = (int32_t)taosHashGetSize(pCacheObj->pHashTable); assert(size > 0); - uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, num:%d size:%" PRId64 "bytes", + uDebug("cache:%s, key:%p, %p is destroyed from cache, size:%dbytes, totalNum:%d size:%" PRId64 "bytes", pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize); if (pCacheObj->freeFp) {