enh(sync): raft config change
This commit is contained in:
parent
3e65caccaa
commit
956ab45a30
|
@ -35,6 +35,7 @@ typedef struct SSyncIndexMgr {
|
||||||
} SSyncIndexMgr;
|
} SSyncIndexMgr;
|
||||||
|
|
||||||
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode);
|
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode);
|
||||||
|
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode);
|
||||||
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
|
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
|
||||||
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
|
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
|
||||||
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
|
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
|
||||||
|
|
|
@ -342,6 +342,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
syncNodeUpdateConfig(ths, &newSyncCfg);
|
syncNodeUpdateConfig(ths, &newSyncCfg);
|
||||||
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
syncNodeBecomeLeader(ths);
|
||||||
|
} else {
|
||||||
|
syncNodeBecomeFollower(ths);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
|
@ -120,6 +120,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
|
syncNodeUpdateConfig(pSyncNode, &newSyncCfg);
|
||||||
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
syncNodeBecomeLeader(pSyncNode);
|
||||||
|
} else {
|
||||||
|
syncNodeBecomeFollower(pSyncNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
|
@ -31,6 +31,13 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
|
||||||
return pSyncIndexMgr;
|
return pSyncIndexMgr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) {
|
||||||
|
pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
|
||||||
|
pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
|
||||||
|
pSyncIndexMgr->pSyncNode = pSyncNode;
|
||||||
|
syncIndexMgrClear(pSyncIndexMgr);
|
||||||
|
}
|
||||||
|
|
||||||
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
|
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
if (pSyncIndexMgr != NULL) {
|
if (pSyncIndexMgr != NULL) {
|
||||||
taosMemoryFree(pSyncIndexMgr);
|
taosMemoryFree(pSyncIndexMgr);
|
||||||
|
|
|
@ -905,6 +905,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
||||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||||
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
|
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
|
||||||
|
syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
|
||||||
|
|
||||||
|
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncNode* syncNodeAcquire(int64_t rid) {
|
SSyncNode* syncNodeAcquire(int64_t rid) {
|
||||||
|
|
|
@ -220,7 +220,7 @@ int main(int argc, char** argv) {
|
||||||
assert(pSyncNode != NULL);
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
if (isConfigChange) {
|
if (isConfigChange) {
|
||||||
configChange(rid, replicaNum, myIndex);
|
configChange(rid, 3, myIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
//---------------------------
|
//---------------------------
|
||||||
|
|
Loading…
Reference in New Issue