fix: fix client memory leak
This commit is contained in:
parent
558829dccc
commit
a5c83c7e8d
|
@ -179,7 +179,6 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
|
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
|
||||||
taosMemoryFreeClear(output.dbVgroup);
|
|
||||||
|
|
||||||
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
|
tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
|
||||||
} else if (output.dbVgroup && output.dbVgroup->vgHash) {
|
} else if (output.dbVgroup && output.dbVgroup->vgHash) {
|
||||||
|
@ -189,12 +188,14 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
if (code1 != TSDB_CODE_SUCCESS) {
|
if (code1 != TSDB_CODE_SUCCESS) {
|
||||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
|
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
|
||||||
tstrerror(code1));
|
tstrerror(code1));
|
||||||
taosMemoryFreeClear(output.dbVgroup);
|
|
||||||
} else {
|
} else {
|
||||||
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
|
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
|
||||||
|
output.dbVgroup = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(output.dbVgroup);
|
||||||
|
|
||||||
tFreeSUsedbRsp(&usedbRsp);
|
tFreeSUsedbRsp(&usedbRsp);
|
||||||
|
|
||||||
char db[TSDB_DB_NAME_LEN] = {0};
|
char db[TSDB_DB_NAME_LEN] = {0};
|
||||||
|
|
|
@ -642,6 +642,7 @@ void ctgFreeSTableIndex(void *info);
|
||||||
void ctgClearSubTaskRes(SCtgSubRes *pRes);
|
void ctgClearSubTaskRes(SCtgSubRes *pRes);
|
||||||
void ctgFreeQNode(SCtgQNode *node);
|
void ctgFreeQNode(SCtgQNode *node);
|
||||||
void ctgClearHandle(SCatalog* pCtg);
|
void ctgClearHandle(SCatalog* pCtg);
|
||||||
|
void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
|
||||||
|
|
||||||
|
|
||||||
extern SCatalogMgmt gCtgMgmt;
|
extern SCatalogMgmt gCtgMgmt;
|
||||||
|
|
|
@ -647,6 +647,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
|
||||||
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool syncOp = operation->syncOp;
|
||||||
|
char* opName = gCtgCacheOperation[operation->opId].name;
|
||||||
if (operation->syncOp) {
|
if (operation->syncOp) {
|
||||||
tsem_init(&operation->rspSem, 0, 0);
|
tsem_init(&operation->rspSem, 0, 0);
|
||||||
}
|
}
|
||||||
|
@ -664,14 +666,14 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
|
||||||
gCtgMgmt.queue.tail = node;
|
gCtgMgmt.queue.tail = node;
|
||||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
|
||||||
|
|
||||||
|
ctgDebug("action [%s] added into queue", opName);
|
||||||
|
|
||||||
CTG_QUEUE_INC();
|
CTG_QUEUE_INC();
|
||||||
CTG_RT_STAT_INC(numOfOpEnqueue, 1);
|
CTG_RT_STAT_INC(numOfOpEnqueue, 1);
|
||||||
|
|
||||||
tsem_post(&gCtgMgmt.queue.reqSem);
|
tsem_post(&gCtgMgmt.queue.reqSem);
|
||||||
|
|
||||||
ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name);
|
if (syncOp) {
|
||||||
|
|
||||||
if (operation->syncOp) {
|
|
||||||
tsem_wait(&operation->rspSem);
|
tsem_wait(&operation->rspSem);
|
||||||
taosMemoryFree(operation);
|
taosMemoryFree(operation);
|
||||||
}
|
}
|
||||||
|
@ -840,6 +842,7 @@ _return:
|
||||||
|
|
||||||
ctgFreeVgInfo(dbInfo);
|
ctgFreeVgInfo(dbInfo);
|
||||||
taosMemoryFreeClear(op->data);
|
taosMemoryFreeClear(op->data);
|
||||||
|
taosMemoryFreeClear(op);
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -852,7 +855,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
|
||||||
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
|
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
|
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
char *p = strchr(output->dbFName, '.');
|
char *p = strchr(output->dbFName, '.');
|
||||||
|
@ -871,6 +874,11 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
if (output) {
|
||||||
|
taosMemoryFree(output->tbMeta);
|
||||||
|
taosMemoryFree(output);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(msg);
|
taosMemoryFreeClear(msg);
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
|
@ -1753,6 +1761,16 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
|
||||||
CTG_CACHE_STAT_DEC(numOfStb, 1);
|
CTG_CACHE_STAT_DEC(numOfStb, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
|
||||||
|
if (NULL == pTbCache) {
|
||||||
|
ctgDebug("stb %s already not in cache", msg->stbName);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||||
|
ctgFreeTbCacheImpl(pTbCache);
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||||
|
|
||||||
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
|
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
|
||||||
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
|
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1780,14 +1798,24 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
|
ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
|
||||||
if (NULL == dbCache) {
|
if (NULL == dbCache) {
|
||||||
return TSDB_CODE_SUCCESS;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dbCache->dbId != msg->dbId) {
|
if (dbCache->dbId != msg->dbId) {
|
||||||
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName);
|
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName);
|
||||||
return TSDB_CODE_SUCCESS;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
|
||||||
|
if (NULL == pTbCache) {
|
||||||
|
ctgDebug("tb %s already not in cache", msg->tbName);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||||
|
ctgFreeTbCacheImpl(pTbCache);
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
|
||||||
|
|
||||||
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
|
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
|
||||||
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
|
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
|
@ -2063,6 +2091,8 @@ void* ctgUpdateThreadFunc(void* param) {
|
||||||
|
|
||||||
if (operation->syncOp) {
|
if (operation->syncOp) {
|
||||||
tsem_post(&operation->rspSem);
|
tsem_post(&operation->rspSem);
|
||||||
|
} else {
|
||||||
|
taosMemoryFreeClear(operation);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_RT_STAT_INC(numOfOpDequeue, 1);
|
CTG_RT_STAT_INC(numOfOpDequeue, 1);
|
||||||
|
|
|
@ -261,6 +261,8 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
taosMemoryFree(pMsg->pData);
|
||||||
|
|
||||||
if (pJob) {
|
if (pJob) {
|
||||||
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
|
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,6 +152,7 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
|
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
|
||||||
|
qDebug("tbMeta freed, p:%p", pCache->pMeta);
|
||||||
taosMemoryFreeClear(pCache->pMeta);
|
taosMemoryFreeClear(pCache->pMeta);
|
||||||
if (pCache->pIndex) {
|
if (pCache->pIndex) {
|
||||||
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
|
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
|
||||||
|
@ -831,6 +832,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
|
||||||
if (output->tbMeta) {
|
if (output->tbMeta) {
|
||||||
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
|
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
|
||||||
(*pOutput)->tbMeta = taosMemoryMalloc(metaSize);
|
(*pOutput)->tbMeta = taosMemoryMalloc(metaSize);
|
||||||
|
qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta);
|
||||||
if (NULL == (*pOutput)->tbMeta) {
|
if (NULL == (*pOutput)->tbMeta) {
|
||||||
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
|
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
|
||||||
taosMemoryFreeClear(*pOutput);
|
taosMemoryFreeClear(*pOutput);
|
||||||
|
|
|
@ -41,6 +41,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
if (pTask->execNodes) {
|
if (pTask->execNodes) {
|
||||||
taosHashCleanup(pTask->execNodes);
|
taosHashCleanup(pTask->execNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pTask->profile.execTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {
|
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {
|
||||||
|
|
Loading…
Reference in New Issue