From 1f0d7807accb020ba2317fe70bab9fa8e5d5d2d5 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 21 Jun 2022 19:27:52 +0800 Subject: [PATCH] fix(sync): sender get config from local --- source/libs/sync/src/syncSnapshot.c | 54 +++++++++++++++++------------ 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 68185131d1..3e062d395f 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -73,41 +73,49 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; - // open snapshot reader + // init snapshot and reader ASSERT(pSender->pReader == NULL); pSender->pReader = pReader; pSender->snapshot = snapshot; - /* - // open snapshot reader - ASSERT(pSender->pReader == NULL); - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); - ASSERT(ret == 0); - - // get current snapshot info - pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); - */ - if (pSender->pCurrentBlock != NULL) { taosMemoryFree(pSender->pCurrentBlock); } - pSender->blockLen = 0; if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { - SSyncRaftEntry *pEntry = - pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex); - ASSERT(pEntry != NULL); + int32_t code = 0; + SSyncRaftEntry *pEntry = NULL; + code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, + pSender->snapshot.lastConfigIndex, &pEntry); - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); - SSyncCfg lastConfig; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig); - ASSERT(ret == 0); - pSender->lastConfig = lastConfig; + bool getLastConfig = false; + if (code == 0) { + ASSERT(pEntry != NULL); - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + SSyncCfg lastConfig; + int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig); + ASSERT(ret == 0); + pSender->lastConfig = lastConfig; + getLastConfig = true; + + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pEntry); + } else { + if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) { + sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId); + pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg; + getLastConfig = true; + } + } + + if (!getLastConfig) { + syncNodeLog3("", pSender->pSyncNode); + ASSERT(0); + } } else { memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));