refactor(sync): add last config index

This commit is contained in:
Minghao Li 2022-06-11 13:03:58 +08:00
parent 6586f78599
commit 6adadc0b26
1 changed files with 15 additions and 11 deletions

View File

@ -1181,13 +1181,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
// isDrop bool IamInOld = false;
*isDrop = true; bool IamInNew = false;
bool IamInOld, IamInNew;
for (int i = 0; i < oldConfig.replicaNum; ++i) { for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { (oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
*isDrop = false; IamInOld = false;
break; break;
} }
} }
@ -1195,16 +1194,21 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
for (int i = 0; i < newConfig->replicaNum; ++i) { for (int i = 0; i < newConfig->replicaNum; ++i) {
if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 && if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) { (newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
*isDrop = false; IamInNew = false;
break; break;
} }
} }
if (!(*isDrop)) { *isDrop = true;
// change isStandBy to normal if (IamInOld && !IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; *isDrop = true;
} else {
*isDrop = false;
} }
if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
}
raftCfgPersist(pSyncNode->pRaftCfg); raftCfgPersist(pSyncNode->pRaftCfg);
if (gRaftDetailLog) { if (gRaftDetailLog) {
@ -1821,19 +1825,19 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
ASSERT(ret == 0); ASSERT(ret == 0);
// update new config myIndex // update new config myIndex
bool hit = false; bool IamInNew = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) { for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i; newSyncCfg.myIndex = i;
hit = true; IamInNew = true;
break; break;
} }
} }
bool isDrop; bool isDrop;
if (hit) { // I am in newConfig if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal // change isStandBy to normal