From 93766f8c965e7dea3ea9c3304e96d909ff67ff67 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 19 Jul 2023 08:32:16 +0800 Subject: [PATCH] memory leak --- source/dnode/mnode/impl/src/mndVgroup.c | 4 +- source/libs/sync/src/syncMain.c | 95 ++++++++++--------------- 2 files changed, 42 insertions(+), 57 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 81ea033dba..0adb460c9b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1299,6 +1299,7 @@ int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SMsgHead *pHead = taosMemoryMalloc(totallen); if (pHead == NULL) { + taosMemoryFree(pReq); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -1307,13 +1308,14 @@ int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, pHead->vgId = htonl(pNewVgroup->vgId); memcpy((void*)(pHead + 1), pReq, contLen); + taosMemoryFree(pReq); action.pCont = pHead; action.contLen = totallen; action.msgType = TDMT_SYNC_CONFIG_CHANGE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); + taosMemoryFree(pHead); return -1; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d4faf271ca..1c27c4110f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2508,64 +2508,53 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum } SSyncLogReplMgr oldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; - SSyncLogReplMgr *pOldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; for(int i = 0; i < oldtotalReplicaNum; i++){ oldLogReplMgrs[i] = *(ths->logReplMgrs[i]); - pOldLogReplMgrs[i] = ths->logReplMgrs[i]; } - for(int i = 0; i < ths->totalReplicaNum; ++i){ - ths->logReplMgrs[i] = syncLogReplCreate(); - if(ths->logReplMgrs[i] == NULL){ - return -1; - } + syncNodeLogReplDestroy(ths); + syncNodeLogReplInit(ths); + for(int i = 0; i < ths->totalReplicaNum; ++i){ for(int j = 0; j < oldtotalReplicaNum; j++){ if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) { *(ths->logReplMgrs[i]) = oldLogReplMgrs[j]; } - } - - ths->logReplMgrs[i]->peerId = i; + } } - for(int i = 0; i < oldtotalReplicaNum; i++){ - syncLogReplDestroy(pOldLogReplMgrs[i]); - } - for(int i = 0; i < ths->totalReplicaNum; ++i){ sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")" , ths->vgId, i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex, ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex); } - //6.rebuild sender for(int i = 0; i < oldtotalReplicaNum; ++i){ sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime) } - SSyncSnapshotSender oldSender[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; - SSyncSnapshotSender *pOldSender[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; - for(int i = 0; i < oldtotalReplicaNum; i++){ - oldSender[i] = *(ths->senders[i]); - pOldSender[i] = ths->senders[i]; - } + for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { + if (ths->senders[i] != NULL) { + sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]); - for(int i = 0; i < ths->totalReplicaNum; i++){ - ths->senders[i] = snapshotSenderCreate(ths, i); - - for(int j = 0; j < oldtotalReplicaNum; j++){ - if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){ - *(ths->senders[i]) = oldSender[j]; + if (snapshotSenderIsStart(ths->senders[i])) { + snapshotSenderStop(ths->senders[i], false); } - } + + snapshotSenderDestroy(ths->senders[i]); + ths->senders[i] = NULL; + } } - for(int i = 0; i < oldtotalReplicaNum; i++){ - snapshotSenderDestroy(pOldSender[i]); + for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { + SSyncSnapshotSender* pSender = snapshotSenderCreate(ths, i); + if (pSender == NULL) return -1; + + ths->senders[i] = pSender; + sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender); } for(int i = 0; i < ths->totalReplicaNum; i++){ @@ -2651,19 +2640,15 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ return -1; } - SSyncCfg *cfg = taosMemoryMalloc(sizeof(SSyncCfg)); - if(cfg == NULL){ - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - syncBuildConfigFromReq(&req, cfg); + SSyncCfg cfg = {0}; + syncBuildConfigFromReq(&req, &cfg); - if(cfg->changeVersion <= ths->raftCfg.cfg.changeVersion){ + if(cfg.changeVersion <= ths->raftCfg.cfg.changeVersion){ sInfo("vgId:%d, skip conf change entry since lower version. " "this entry, index:%" PRId64 ", term:%" PRId64 ", totalReplicaNum:%d, changeVersion:%d; " "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64", changeVersion:%d", ths->vgId, - pEntry->index, pEntry->term, cfg->totalReplicaNum, cfg->changeVersion, + pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion); return 0; } @@ -2674,7 +2659,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ "node, rNum:%d, pNum:%d, trNum:%d, " "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)", - ths->vgId, str, pEntry->index - 1, cfg->totalReplicaNum, cfg->changeVersion, + ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); @@ -2685,22 +2670,22 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ "node, rNum:%d, pNum:%d, trNum:%d, " "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")", - ths->vgId, str, pEntry->index, pEntry->term, cfg->totalReplicaNum, cfg->changeVersion, + ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index -1, ths->commitIndex, ths->pLogBuf->commitIndex); } - syncNodeLogConfigInfo(ths, cfg, "before config change"); + syncNodeLogConfigInfo(ths, &cfg, "before config change"); int32_t oldTotalReplicaNum = ths->totalReplicaNum; - if(cfg->totalReplicaNum == 1 || cfg->totalReplicaNum == 2){//remove replica + if(cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2){//remove replica bool incfg = false; - for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ - if(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 - && ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort){ + for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ + if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 + && ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ incfg = true; break; } @@ -2711,7 +2696,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ //no need to change myNodeInfo - if(syncNodeRebuildPeerAndCfg(ths, cfg) != 0){ + if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){ return -1; }; @@ -2749,17 +2734,17 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ sInfo("vgId:%d, begin change replica type", ths->vgId); //change myNodeInfo - for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ - if(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 - && ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort){ - if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ + for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ + if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 + && ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ + if(cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER; } } } //change peer and cfg - syncNodeChangePeerAndCfgToVoter(ths, cfg); + syncNodeChangePeerAndCfgToVoter(ths, &cfg); //change other syncNodeChangeToVoter(ths); @@ -2779,7 +2764,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ //no need to change myNodeInfo //change peer and cfg - if(syncNodeRebuildPeerAndCfg(ths, cfg) != 0){ + if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){ return -1; }; @@ -2800,17 +2785,15 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ ths->raftCfg.lastConfigIndex = pEntry->index; ths->raftCfg.cfg.lastIndex = pEntry->index; - ths->raftCfg.cfg.changeVersion = cfg->changeVersion; + ths->raftCfg.cfg.changeVersion = cfg.changeVersion; - syncNodeLogConfigInfo(ths, cfg, "after config change"); + syncNodeLogConfigInfo(ths, &cfg, "after config change"); if(syncWriteCfgFile(ths) != 0){ sError("vgId:%d, failed to create sync cfg file", ths->vgId); return -1; }; - taosMemoryFree(cfg); - return 0; }