From 154bc177a736c4037127773d9a86a1a98bb46173 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 19 Jul 2023 17:33:55 +0800 Subject: [PATCH] memory leak --- source/libs/sync/src/syncPipeline.c | 57 +++++++++++++++-------------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 9ffd790281..532a6955cf 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -638,39 +638,40 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pEntry->index, pEntry->term, role, currentTerm); pNextEntry = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf); - if (pNextEntry != NULL && pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { - sInfo("vgId:%d, to change config at Commit. " - "current entry, index:%" PRId64 ", term:%" PRId64", " - "node, role:%d, current term:%" PRId64 ", restore:%d, " - "cond, next entry index:%" PRId64 ", msgType:%s", - vgId, - pEntry->index, pEntry->term, - role, currentTerm, pNode->restoreFinish, - pNextEntry->index, TMSG_INFO(pNextEntry->originalRpcType)); + if (pNextEntry != NULL) { + if(pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ + sInfo("vgId:%d, to change config at Commit. " + "current entry, index:%" PRId64 ", term:%" PRId64", " + "node, role:%d, current term:%" PRId64 ", restore:%d, " + "cond, next entry index:%" PRId64 ", msgType:%s", + vgId, + pEntry->index, pEntry->term, + role, currentTerm, pNode->restoreFinish, + pNextEntry->index, TMSG_INFO(pNextEntry->originalRpcType)); - if(syncNodeChangeConfig(pNode, pNextEntry, "Commit") != 0){ - sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64 - ", role:%d, current term:%" PRId64, - vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); - goto _out; - } - - //for 2->1, need to apply config change entry in sync thread, - if(pNode->replicaNum == 1){ - if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) { - sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 - ", role:%d, current term:%" PRId64, - vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); - goto _out; + if(syncNodeChangeConfig(pNode, pNextEntry, "Commit") != 0){ + sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64 + ", role:%d, current term:%" PRId64, + vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); + goto _out; } - index++; - pBuf->commitIndex = index; + //for 2->1, need to apply config change entry in sync thread, + if(pNode->replicaNum == 1){ + if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) { + sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 + ", role:%d, current term:%" PRId64, + vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); + goto _out; + } - sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, - pNextEntry->index, pNextEntry->term, role, currentTerm); + index++; + pBuf->commitIndex = index; + + sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, + pNextEntry->index, pNextEntry->term, role, currentTerm); + } } - if (!nextInBuf) { syncEntryDestroy(pNextEntry); pNextEntry = NULL;