refactor: config change
This commit is contained in:
parent
2dee385502
commit
4e1aa6b5c5
|
@ -88,6 +88,9 @@ typedef struct SReConfigCbMeta {
|
||||||
SyncIndex index;
|
SyncIndex index;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncTerm currentTerm;
|
SyncTerm currentTerm;
|
||||||
|
SSyncCfg oldCfg;
|
||||||
|
bool isDrop;
|
||||||
|
uint64_t flag;
|
||||||
} SReConfigCbMeta;
|
} SReConfigCbMeta;
|
||||||
|
|
||||||
typedef struct SSyncFSM {
|
typedef struct SSyncFSM {
|
||||||
|
|
|
@ -182,7 +182,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
|
||||||
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
|
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
|
||||||
char* syncNode2Str(const SSyncNode* pSyncNode);
|
char* syncNode2Str(const SSyncNode* pSyncNode);
|
||||||
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
|
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
|
||||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig);
|
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop);
|
||||||
|
|
||||||
SSyncNode* syncNodeAcquire(int64_t rid);
|
SSyncNode* syncNodeAcquire(int64_t rid);
|
||||||
void syncNodeRelease(SSyncNode* pNode);
|
void syncNodeRelease(SSyncNode* pNode);
|
||||||
|
|
|
@ -333,7 +333,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
cbMeta.seqNum = pEntry->seqNum;
|
cbMeta.seqNum = pEntry->seqNum;
|
||||||
cbMeta.term = pEntry->term;
|
cbMeta.term = pEntry->term;
|
||||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
cbMeta.flag = 9;
|
cbMeta.flag = 0x11;
|
||||||
|
|
||||||
bool needExecute = true;
|
bool needExecute = true;
|
||||||
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {
|
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {
|
||||||
|
@ -347,24 +347,51 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
// config change
|
// config change
|
||||||
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||||
|
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
|
||||||
|
|
||||||
SSyncCfg newSyncCfg;
|
SSyncCfg newSyncCfg;
|
||||||
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
syncNodeUpdateConfig(ths, &newSyncCfg);
|
// update new config myIndex
|
||||||
|
bool hit = false;
|
||||||
|
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
|
||||||
|
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
|
||||||
|
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
|
||||||
|
newSyncCfg.myIndex = i;
|
||||||
|
hit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(hit == true);
|
||||||
|
|
||||||
|
bool isDrop;
|
||||||
|
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
|
||||||
|
|
||||||
|
// change isStandBy to normal
|
||||||
|
if (!isDrop) {
|
||||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
syncNodeBecomeLeader(ths);
|
syncNodeBecomeLeader(ths);
|
||||||
} else {
|
} else {
|
||||||
syncNodeBecomeFollower(ths);
|
syncNodeBecomeFollower(ths);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char* sOld = syncCfg2Str(&oldSyncCfg);
|
||||||
|
char* sNew = syncCfg2Str(&newSyncCfg);
|
||||||
|
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
|
||||||
|
taosMemoryFree(sOld);
|
||||||
|
taosMemoryFree(sNew);
|
||||||
|
|
||||||
// maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig
|
|
||||||
if (ths->pFsm->FpReConfigCb != NULL) {
|
if (ths->pFsm->FpReConfigCb != NULL) {
|
||||||
SReConfigCbMeta cbMeta = {0};
|
SReConfigCbMeta cbMeta = {0};
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
cbMeta.term = pEntry->term;
|
cbMeta.term = pEntry->term;
|
||||||
|
cbMeta.oldCfg = oldSyncCfg;
|
||||||
|
cbMeta.flag = 0x11;
|
||||||
|
cbMeta.isDrop = isDrop;
|
||||||
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
|
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,7 +111,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
cbMeta.seqNum = pEntry->seqNum;
|
cbMeta.seqNum = pEntry->seqNum;
|
||||||
cbMeta.term = pEntry->term;
|
cbMeta.term = pEntry->term;
|
||||||
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
||||||
cbMeta.flag = 7;
|
cbMeta.flag = 0x1;
|
||||||
|
|
||||||
bool needExecute = true;
|
bool needExecute = true;
|
||||||
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {
|
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {
|
||||||
|
@ -125,24 +125,51 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
// config change
|
// config change
|
||||||
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||||
|
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
|
||||||
|
|
||||||
SSyncCfg newSyncCfg;
|
SSyncCfg newSyncCfg;
|
||||||
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
|
// update new config myIndex
|
||||||
|
bool hit = false;
|
||||||
|
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
|
||||||
|
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
|
||||||
|
pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
|
||||||
|
newSyncCfg.myIndex = i;
|
||||||
|
hit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(hit == true);
|
||||||
|
|
||||||
|
bool isDrop;
|
||||||
|
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, &isDrop);
|
||||||
|
|
||||||
|
// change isStandBy to normal
|
||||||
|
if (!isDrop) {
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
syncNodeBecomeLeader(pSyncNode);
|
syncNodeBecomeLeader(pSyncNode);
|
||||||
} else {
|
} else {
|
||||||
syncNodeBecomeFollower(pSyncNode);
|
syncNodeBecomeFollower(pSyncNode);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char* sOld = syncCfg2Str(&oldSyncCfg);
|
||||||
|
char* sNew = syncCfg2Str(&newSyncCfg);
|
||||||
|
sInfo("==config change== 0x1 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
|
||||||
|
taosMemoryFree(sOld);
|
||||||
|
taosMemoryFree(sNew);
|
||||||
|
|
||||||
// maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig
|
|
||||||
if (pSyncNode->pFsm->FpReConfigCb != NULL) {
|
if (pSyncNode->pFsm->FpReConfigCb != NULL) {
|
||||||
SReConfigCbMeta cbMeta = {0};
|
SReConfigCbMeta cbMeta = {0};
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
cbMeta.term = pEntry->term;
|
cbMeta.term = pEntry->term;
|
||||||
|
cbMeta.oldCfg = oldSyncCfg;
|
||||||
|
cbMeta.flag = 0x1;
|
||||||
|
cbMeta.isDrop = isDrop;
|
||||||
pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta);
|
pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -951,21 +951,9 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) {
|
||||||
bool hit = false;
|
|
||||||
for (int i = 0; i < newConfig->replicaNum; ++i) {
|
|
||||||
if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newConfig->nodeInfo)[i].nodeFqdn) == 0 &&
|
|
||||||
pSyncNode->myNodeInfo.nodePort == (newConfig->nodeInfo)[i].nodePort) {
|
|
||||||
newConfig->myIndex = i;
|
|
||||||
hit = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ASSERT(hit == true);
|
|
||||||
|
|
||||||
pSyncNode->pRaftCfg->cfg = *newConfig;
|
pSyncNode->pRaftCfg->cfg = *newConfig;
|
||||||
int32_t ret = raftCfgPersist(pSyncNode->pRaftCfg);
|
int32_t ret = 0;
|
||||||
ASSERT(ret == 0);
|
|
||||||
|
|
||||||
// init internal
|
// init internal
|
||||||
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
||||||
|
@ -995,9 +983,22 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
||||||
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
|
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
|
||||||
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
|
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
|
||||||
|
|
||||||
pSyncNode->pRaftCfg->isStandBy = 0;
|
// isDrop
|
||||||
raftCfgPersist(pSyncNode->pRaftCfg);
|
*isDrop = true;
|
||||||
|
for (int i = 0; i < newConfig->replicaNum; ++i) {
|
||||||
|
if (strcmp((newConfig->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
|
||||||
|
(newConfig->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
|
||||||
|
*isDrop = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(*isDrop)) {
|
||||||
|
// change isStandBy to normal
|
||||||
|
pSyncNode->pRaftCfg->isStandBy = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
raftCfgPersist(pSyncNode->pRaftCfg);
|
||||||
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
|
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,8 +43,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
|
||||||
if (cbMeta.index > beginIndex) {
|
if (cbMeta.index > beginIndex) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n",
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
"==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm,
|
||||||
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||||
} else {
|
} else {
|
||||||
sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index);
|
sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index);
|
||||||
|
@ -54,15 +55,16 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||||
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, cbMeta.index,
|
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm,
|
||||||
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n",
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
"==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm,
|
||||||
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue