refactor(sync): add last config index z
This commit is contained in:
parent
fa54663871
commit
dffbec29c7
|
@ -190,15 +190,18 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
|||
if (gRaftDetailLog) {
|
||||
char* s = snapshotSender2Str(pSender);
|
||||
sInfo(
|
||||
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld"
|
||||
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld"
|
||||
"sender:%s",
|
||||
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, s);
|
||||
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
|
||||
pSender->snapshot.lastConfigIndex, s);
|
||||
taosMemoryFree(s);
|
||||
} else {
|
||||
sInfo(
|
||||
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
|
||||
"lastApplyTerm:%lu lastConfigIndex:%ld",
|
||||
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||
ths->vgId, host, port, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
|
||||
pSender->snapshot.lastConfigIndex);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
#include "syncVoteMgr.h"
|
||||
#include "tref.h"
|
||||
|
||||
bool gRaftDetailLog = false;
|
||||
bool gRaftDetailLog = true;
|
||||
|
||||
static int32_t tsNodeRefId = -1;
|
||||
|
||||
|
@ -311,6 +311,8 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
|
|||
assert(rid == pSyncNode->rid);
|
||||
sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;
|
||||
|
||||
sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return 0;
|
||||
}
|
||||
|
@ -520,6 +522,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
|||
SRaftCfgMeta meta;
|
||||
meta.isStandBy = pSyncInfo->isStandBy;
|
||||
meta.snapshotEnable = pSyncInfo->snapshotEnable;
|
||||
meta.lastConfigIndex = SYNC_INDEX_INVALID;
|
||||
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
|
||||
assert(ret == 0);
|
||||
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
*/
|
||||
|
||||
#include "syncMessage.h"
|
||||
#include "syncRaftCfg.h"
|
||||
#include "syncUtil.h"
|
||||
#include "tcoding.h"
|
||||
#include "syncRaftCfg.h"
|
||||
|
||||
// ---------------------------------------------
|
||||
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
|
||||
#include "syncSnapshot.h"
|
||||
#include "syncIndexMgr.h"
|
||||
#include "syncRaftCfg.h"
|
||||
#include "syncRaftLog.h"
|
||||
#include "syncRaftStore.h"
|
||||
#include "syncUtil.h"
|
||||
#include "wal.h"
|
||||
#include "syncRaftCfg.h"
|
||||
|
||||
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId);
|
||||
|
||||
|
@ -85,10 +85,17 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
// get current snapshot info
|
||||
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
||||
if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
|
||||
/*
|
||||
SSyncRaftEntry *pEntry = NULL;
|
||||
int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex, &pEntry);
|
||||
int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
|
||||
pSender->snapshot.lastConfigIndex, &pEntry);
|
||||
ASSERT(code == 0);
|
||||
ASSERT(pEntry == NULL);
|
||||
ASSERT(pEntry != NULL);
|
||||
*/
|
||||
|
||||
SSyncRaftEntry *pEntry =
|
||||
pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
|
||||
ASSERT(pEntry != NULL);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
@ -104,7 +111,6 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
|
||||
}
|
||||
|
||||
|
||||
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
|
||||
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||
++(pSender->privateTerm);
|
||||
|
@ -135,15 +141,18 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sTrace(
|
||||
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send "
|
||||
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld send "
|
||||
"msg:%s",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sTrace("sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||
sTrace(
|
||||
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||
}
|
||||
|
||||
syncSnapshotSendDestroy(pMsg);
|
||||
|
@ -270,20 +279,25 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
if (gRaftDetailLog) {
|
||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||
sTrace(
|
||||
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld send "
|
||||
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld send "
|
||||
"msg:%s",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, msgStr);
|
||||
taosMemoryFree(msgStr);
|
||||
} else {
|
||||
sTrace("sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||
sTrace(
|
||||
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||
}
|
||||
} else {
|
||||
sTrace("sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu lastConfigIndex:%ld",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||
sTrace(
|
||||
"sync event vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
|
||||
"lastConfigIndex:%ld",
|
||||
pSender->pSyncNode->vgId, host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex,
|
||||
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||
}
|
||||
|
||||
syncSnapshotSendDestroy(pMsg);
|
||||
|
@ -569,8 +583,27 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
|||
|
||||
// maybe update lastconfig
|
||||
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
|
||||
// update new config myIndex
|
||||
bool IamInNew = false;
|
||||
SSyncCfg newSyncCfg = pMsg->lastConfig;
|
||||
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
|
||||
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
|
||||
pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
|
||||
newSyncCfg.myIndex = i;
|
||||
IamInNew = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bool isDrop;
|
||||
syncNodeUpdateConfig(pSyncNode, &(pMsg->lastConfig), pMsg->lastConfigIndex, &isDrop);
|
||||
if (IamInNew) {
|
||||
sTrace("sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ",
|
||||
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
|
||||
} else {
|
||||
sTrace("sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld ",
|
||||
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
|
||||
}
|
||||
}
|
||||
|
||||
SSnapshot snapshot;
|
||||
|
|
|
@ -30,7 +30,8 @@ SyncSnapshotSend *createMsg() {
|
|||
pMsg->lastConfig.myIndex = 1;
|
||||
for (int i = 0; i < pMsg->lastConfig.replicaNum; ++i) {
|
||||
((pMsg->lastConfig.nodeInfo)[i]).nodePort = i * 100;
|
||||
snprintf(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn, sizeof(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
|
||||
snprintf(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn, sizeof(((pMsg->lastConfig.nodeInfo)[i]).nodeFqdn),
|
||||
"100.200.300.%d", i);
|
||||
}
|
||||
|
||||
pMsg->seq = 44;
|
||||
|
@ -96,7 +97,6 @@ void test5() {
|
|||
}
|
||||
|
||||
int main() {
|
||||
|
||||
gRaftDetailLog = true;
|
||||
|
||||
tsAsyncLog = 0;
|
||||
|
|
Loading…
Reference in New Issue