memory leak

This commit is contained in:
dmchen 2023-07-19 08:32:16 +08:00
parent 3711fdd865
commit 93766f8c96
2 changed files with 42 additions and 57 deletions

View File

@ -1299,6 +1299,7 @@ int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
SMsgHead *pHead = taosMemoryMalloc(totallen);
if (pHead == NULL) {
taosMemoryFree(pReq);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -1307,13 +1308,14 @@ int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
pHead->vgId = htonl(pNewVgroup->vgId);
memcpy((void*)(pHead + 1), pReq, contLen);
taosMemoryFree(pReq);
action.pCont = pHead;
action.contLen = totallen;
action.msgType = TDMT_SYNC_CONFIG_CHANGE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
taosMemoryFree(pHead);
return -1;
}

View File

@ -2508,64 +2508,53 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
}
SSyncLogReplMgr oldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
SSyncLogReplMgr *pOldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
for(int i = 0; i < oldtotalReplicaNum; i++){
oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
pOldLogReplMgrs[i] = ths->logReplMgrs[i];
}
for(int i = 0; i < ths->totalReplicaNum; ++i){
ths->logReplMgrs[i] = syncLogReplCreate();
if(ths->logReplMgrs[i] == NULL){
return -1;
}
syncNodeLogReplDestroy(ths);
syncNodeLogReplInit(ths);
for(int i = 0; i < ths->totalReplicaNum; ++i){
for(int j = 0; j < oldtotalReplicaNum; j++){
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
*(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
}
}
ths->logReplMgrs[i]->peerId = i;
}
}
for(int i = 0; i < oldtotalReplicaNum; i++){
syncLogReplDestroy(pOldLogReplMgrs[i]);
}
for(int i = 0; i < ths->totalReplicaNum; ++i){
sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")" , ths->vgId, i,
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
}
//6.rebuild sender
for(int i = 0; i < oldtotalReplicaNum; ++i){
sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64,
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
}
SSyncSnapshotSender oldSender[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
SSyncSnapshotSender *pOldSender[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
for(int i = 0; i < oldtotalReplicaNum; i++){
oldSender[i] = *(ths->senders[i]);
pOldSender[i] = ths->senders[i];
}
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
if (ths->senders[i] != NULL) {
sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
for(int i = 0; i < ths->totalReplicaNum; i++){
ths->senders[i] = snapshotSenderCreate(ths, i);
for(int j = 0; j < oldtotalReplicaNum; j++){
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){
*(ths->senders[i]) = oldSender[j];
if (snapshotSenderIsStart(ths->senders[i])) {
snapshotSenderStop(ths->senders[i], false);
}
}
snapshotSenderDestroy(ths->senders[i]);
ths->senders[i] = NULL;
}
}
for(int i = 0; i < oldtotalReplicaNum; i++){
snapshotSenderDestroy(pOldSender[i]);
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(ths, i);
if (pSender == NULL) return -1;
ths->senders[i] = pSender;
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
}
for(int i = 0; i < ths->totalReplicaNum; i++){
@ -2651,19 +2640,15 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
return -1;
}
SSyncCfg *cfg = taosMemoryMalloc(sizeof(SSyncCfg));
if(cfg == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
syncBuildConfigFromReq(&req, cfg);
SSyncCfg cfg = {0};
syncBuildConfigFromReq(&req, &cfg);
if(cfg->changeVersion <= ths->raftCfg.cfg.changeVersion){
if(cfg.changeVersion <= ths->raftCfg.cfg.changeVersion){
sInfo("vgId:%d, skip conf change entry since lower version. "
"this entry, index:%" PRId64 ", term:%" PRId64 ", totalReplicaNum:%d, changeVersion:%d; "
"current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64", changeVersion:%d",
ths->vgId,
pEntry->index, pEntry->term, cfg->totalReplicaNum, cfg->changeVersion,
pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion,
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
return 0;
}
@ -2674,7 +2659,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
"node, rNum:%d, pNum:%d, trNum:%d, "
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
"cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
ths->vgId, str, pEntry->index - 1, cfg->totalReplicaNum, cfg->changeVersion,
ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion,
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
@ -2685,22 +2670,22 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
"node, rNum:%d, pNum:%d, trNum:%d, "
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
"cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
ths->vgId, str, pEntry->index, pEntry->term, cfg->totalReplicaNum, cfg->changeVersion,
ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion,
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
pEntry->index -1, ths->commitIndex, ths->pLogBuf->commitIndex);
}
syncNodeLogConfigInfo(ths, cfg, "before config change");
syncNodeLogConfigInfo(ths, &cfg, "before config change");
int32_t oldTotalReplicaNum = ths->totalReplicaNum;
if(cfg->totalReplicaNum == 1 || cfg->totalReplicaNum == 2){//remove replica
if(cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2){//remove replica
bool incfg = false;
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort){
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
incfg = true;
break;
}
@ -2711,7 +2696,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
//no need to change myNodeInfo
if(syncNodeRebuildPeerAndCfg(ths, cfg) != 0){
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){
return -1;
};
@ -2749,17 +2734,17 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
sInfo("vgId:%d, begin change replica type", ths->vgId);
//change myNodeInfo
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort){
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
if(cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
}
}
}
//change peer and cfg
syncNodeChangePeerAndCfgToVoter(ths, cfg);
syncNodeChangePeerAndCfgToVoter(ths, &cfg);
//change other
syncNodeChangeToVoter(ths);
@ -2779,7 +2764,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
//no need to change myNodeInfo
//change peer and cfg
if(syncNodeRebuildPeerAndCfg(ths, cfg) != 0){
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){
return -1;
};
@ -2800,17 +2785,15 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
ths->raftCfg.lastConfigIndex = pEntry->index;
ths->raftCfg.cfg.lastIndex = pEntry->index;
ths->raftCfg.cfg.changeVersion = cfg->changeVersion;
ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
syncNodeLogConfigInfo(ths, cfg, "after config change");
syncNodeLogConfigInfo(ths, &cfg, "after config change");
if(syncWriteCfgFile(ths) != 0){
sError("vgId:%d, failed to create sync cfg file", ths->vgId);
return -1;
};
taosMemoryFree(cfg);
return 0;
}