fix(sync): sender get config from local

This commit is contained in:
Minghao Li 2022-06-21 19:27:52 +08:00
parent 4381c3620d
commit 1f0d7807ac
1 changed files with 31 additions and 23 deletions

View File

@ -73,41 +73,49 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
// open snapshot reader // init snapshot and reader
ASSERT(pSender->pReader == NULL); ASSERT(pSender->pReader == NULL);
pSender->pReader = pReader; pSender->pReader = pReader;
pSender->snapshot = snapshot; pSender->snapshot = snapshot;
/*
// open snapshot reader
ASSERT(pSender->pReader == NULL);
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
ASSERT(ret == 0);
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
*/
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
} }
pSender->blockLen = 0; pSender->blockLen = 0;
if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
SSyncRaftEntry *pEntry = int32_t code = 0;
pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex); SSyncRaftEntry *pEntry = NULL;
ASSERT(pEntry != NULL); code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
pSender->snapshot.lastConfigIndex, &pEntry);
SRpcMsg rpcMsg; bool getLastConfig = false;
syncEntry2OriginalRpc(pEntry, &rpcMsg); if (code == 0) {
SSyncCfg lastConfig; ASSERT(pEntry != NULL);
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
ASSERT(ret == 0);
pSender->lastConfig = lastConfig;
rpcFreeCont(rpcMsg.pCont); SRpcMsg rpcMsg;
syncEntryDestory(pEntry); syncEntry2OriginalRpc(pEntry, &rpcMsg);
SSyncCfg lastConfig;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
ASSERT(ret == 0);
pSender->lastConfig = lastConfig;
getLastConfig = true;
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
} else {
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId);
pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
getLastConfig = true;
}
}
if (!getLastConfig) {
syncNodeLog3("", pSender->pSyncNode);
ASSERT(0);
}
} else { } else {
memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));