fix mem leak

This commit is contained in:
yihaoDeng 2023-08-26 21:54:36 +08:00
parent 7d237d4310
commit 998dac0ffb
2 changed files with 53 additions and 6 deletions

View File

@ -397,7 +397,7 @@ typedef struct SStreamMeta {
SMetaHbInfo hbInfo;
int32_t closedTask;
int32_t chkptNotReadyTasks;
int64_t rid;
int64_t rid;
int64_t chkpId;
SArray* chkpSaved;

View File

@ -30,16 +30,26 @@ int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
typedef struct {
TdThreadMutex mutex;
SHashObj* pTable;
} SGStreamMetaMgt;
SGStreamMetaMgt gStreamMetaMgt;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
static void streamMetaClear(SStreamMeta* pMeta);
void streamMetaCloseImpl(void* arg);
void streamMetaCloseImpl(void* arg);
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
taosThreadMutexInit(&(gStreamMetaMgt.mutex), NULL);
gStreamMetaMgt.pTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
@ -47,6 +57,35 @@ void streamMetaCleanup() {
taosCloseRef(streamBackendId);
taosCloseRef(streamBackendCfWrapperId);
taosCloseRef(streamMetaId);
taosThreadMutexDestroy(&gStreamMetaMgt.mutex);
void* pIter = taosHashIterate(gStreamMetaMgt.pTable, NULL);
while (pIter) {
SArray* list = *(SArray**)pIter;
for (int i = 0; i < taosArrayGetSize(list); i++) {
void* rid = taosArrayGetP(list, i);
taosMemoryFree(rid);
}
taosArrayDestroy(list);
pIter = taosHashIterate(gStreamMetaMgt.pTable, pIter);
}
taosHashCleanup(gStreamMetaMgt.pTable);
}
int32_t streamMetaAddRidToGlobalMgt(int64_t vgId, int64_t* rid) {
taosThreadMutexLock(&gStreamMetaMgt.mutex);
void* p = taosHashGet(gStreamMetaMgt.pTable, &vgId, sizeof(vgId));
if (p == NULL) {
SArray* list = taosArrayInit(8, sizeof(void*));
taosArrayPush(list, &rid);
taosHashPut(gStreamMetaMgt.pTable, &vgId, sizeof(vgId), &list, sizeof(void*));
} else {
SArray* list = *(SArray**)p;
taosArrayPush(list, &rid);
}
taosThreadMutexUnlock(&gStreamMetaMgt.mutex);
return 0;
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
@ -102,6 +141,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
*pRid = pMeta->rid;
streamMetaAddRidToGlobalMgt(pMeta->vgId, pRid);
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->hbInfo.tickCounter = 0;
pMeta->hbInfo.stopFlag = 0;
@ -214,6 +255,12 @@ void streamMetaClear(SStreamMeta* pMeta) {
void streamMetaClose(SStreamMeta* pMeta) {
qDebug("start to close stream meta");
// int64_t rid = *(int64_t*)pMeta->pRid;
// if (taosTmrStop(pMeta->hbInfo.hbTmr)) {
// taosMemoryFree(pMeta->pRid);
// } else {
// // do nothing, stop by timer thread
// }
taosRemoveRef(streamMetaId, pMeta->rid);
}
@ -641,7 +688,7 @@ void metaHbToMnode(void* param, void* tmrId) {
SStreamHbMsg hbMsg = {0};
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
taosMemoryFree(param);
// taosMemoryFree(param);
return;
}