refactor(sync): sync snapshot
This commit is contained in:
parent
29b97fa0ad
commit
98a2451c15
|
@ -469,7 +469,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
// delete confict entries
|
// delete confict entries
|
||||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
sInfo("syncNodeMakeLogSame, from %ld to %ld", delBegin, delEnd);
|
sInfo("sync event log truncate, from %ld to %ld", delBegin, delEnd);
|
||||||
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
|
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -556,8 +556,6 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
}
|
}
|
||||||
ASSERT(pMsg->dataLen >= 0);
|
ASSERT(pMsg->dataLen >= 0);
|
||||||
|
|
||||||
bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
|
|
||||||
|
|
||||||
// candidate to follower
|
// candidate to follower
|
||||||
//
|
//
|
||||||
// operation:
|
// operation:
|
||||||
|
@ -582,7 +580,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
// I have snapshot, have log, log <= snapshot, preIndex > myLastIndex
|
// I have snapshot, have log, log <= snapshot, preIndex > myLastIndex
|
||||||
//
|
//
|
||||||
// condition3:
|
// condition3:
|
||||||
// I have snapshot, preIndex <= snapshot.lastApplyIndex
|
// I have snapshot, preIndex < snapshot.lastApplyIndex
|
||||||
|
//
|
||||||
|
// condition4:
|
||||||
|
// I have snapshot, preIndex == snapshot.lastApplyIndex, no data
|
||||||
//
|
//
|
||||||
// operation:
|
// operation:
|
||||||
// match snapshot.lastApplyIndex - 1;
|
// match snapshot.lastApplyIndex - 1;
|
||||||
|
@ -598,15 +599,16 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex);
|
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex);
|
||||||
bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) &&
|
bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) &&
|
||||||
(pMsg->prevLogIndex > myLastIndex);
|
(pMsg->prevLogIndex > myLastIndex);
|
||||||
bool condition3 = condition0 && (pMsg->prevLogIndex <= snapshot.lastApplyIndex);
|
bool condition3 = condition0 && (pMsg->prevLogIndex < snapshot.lastApplyIndex);
|
||||||
bool condition = condition1 || condition2 || condition3;
|
bool condition4 = condition0 && (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->dataLen == 0);
|
||||||
|
bool condition = condition1 || condition2 || condition3 || condition4;
|
||||||
|
|
||||||
if (condition) {
|
if (condition) {
|
||||||
sTrace(
|
sTrace(
|
||||||
"recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, "
|
"recv SyncAppendEntries, fake match, myLastIndex:%ld, syncLogBeginIndex:%ld, syncLogEndIndex:%ld, "
|
||||||
"condition1:%d, condition2:%d, condition3:%d",
|
"condition1:%d, condition2:%d, condition3:%d, condition4:%d",
|
||||||
myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore),
|
myLastIndex, ths->pLogStore->syncLogBeginIndex(ths->pLogStore),
|
||||||
ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3);
|
ths->pLogStore->syncLogEndIndex(ths->pLogStore), condition1, condition2, condition3, condition4);
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
@ -615,7 +617,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
pReply->term = ths->pRaftStore->currentTerm;
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||||
pReply->success = true;
|
pReply->success = true;
|
||||||
pReply->matchIndex = snapshot.lastApplyIndex - 1;
|
pReply->matchIndex = snapshot.lastApplyIndex;
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -627,6 +629,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
// calculate logOK here, before will coredump, due to fake match
|
||||||
|
bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
|
||||||
|
|
||||||
// not match
|
// not match
|
||||||
//
|
//
|
||||||
// condition1:
|
// condition1:
|
||||||
|
|
|
@ -178,7 +178,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
snapshotSenderStart(pSender);
|
snapshotSenderStart(pSender);
|
||||||
|
|
||||||
char* s = snapshotSender2Str(pSender);
|
char* s = snapshotSender2Str(pSender);
|
||||||
sInfo("snapshot send, start sender first time, sender:%s", s);
|
sInfo("snapshot send start sender first time, sender:%s", s);
|
||||||
taosMemoryFree(s);
|
taosMemoryFree(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -205,6 +205,19 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
||||||
|
|
||||||
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
{
|
||||||
|
pCfg->myIndex = myIndex;
|
||||||
|
pCfg->replicaNum = replicaNum;
|
||||||
|
|
||||||
|
for (int i = 0; i < replicaNum; ++i) {
|
||||||
|
pCfg->nodeInfo[i].nodePort = gPorts[i];
|
||||||
|
taosGetFqdn(pCfg->nodeInfo[i].nodeFqdn);
|
||||||
|
// snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
if (isStandBy) {
|
if (isStandBy) {
|
||||||
pCfg->myIndex = 0;
|
pCfg->myIndex = 0;
|
||||||
pCfg->replicaNum = 1;
|
pCfg->replicaNum = 1;
|
||||||
|
@ -222,6 +235,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int64_t rid = syncOpen(&syncInfo);
|
int64_t rid = syncOpen(&syncInfo);
|
||||||
assert(rid > 0);
|
assert(rid > 0);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue