fix: use sync cfg from vnode
This commit is contained in:
parent
fdb6d2be11
commit
6d1bf00cee
|
@ -1128,7 +1128,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
if (pSyncInfo->syncCfg.replicaNum > 0 && pSyncInfo->syncCfg.replicaNum != pSyncNode->pRaftCfg->cfg.replicaNum) {
|
if (pSyncInfo->syncCfg.replicaNum > 0 && pSyncInfo->syncCfg.replicaNum != pSyncNode->pRaftCfg->cfg.replicaNum) {
|
||||||
sInfo("vgId:%d, use sync config from input options", pSyncNode->vgId);
|
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
|
||||||
|
pSyncNode->pRaftCfg->cfg = pSyncInfo->syncCfg;
|
||||||
|
if (raftCfgPersist(pSyncNode->pRaftCfg) != 0) {
|
||||||
|
sError("vgId:%d, failed to persist raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
|
sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
|
||||||
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
|
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
|
||||||
|
@ -1161,14 +1166,14 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
// init raft config
|
// init raft config
|
||||||
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
|
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
|
||||||
if (pSyncNode->pRaftCfg == NULL) {
|
if (pSyncNode->pRaftCfg == NULL) {
|
||||||
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
|
sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init internal
|
// init internal
|
||||||
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
||||||
if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
|
if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
|
||||||
sError("failed to determine my raft member id. vgId:%d", pSyncNode->vgId);
|
sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1183,7 +1188,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
}
|
}
|
||||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
|
if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
|
||||||
sError("failed to determine raft member id. vgId:%d, peer:%d", pSyncNode->vgId, i);
|
sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1192,7 +1197,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
|
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
|
||||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||||
if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
|
if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
|
||||||
sError("failed to determine raft member id. vgId:%d, replica:%d", pSyncNode->vgId, i);
|
sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1233,38 +1238,38 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
|
pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath);
|
||||||
if (pSyncNode->pRaftStore == NULL) {
|
if (pSyncNode->pRaftStore == NULL) {
|
||||||
sError("failed to open raft store. path: %s", pSyncNode->raftStorePath);
|
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init TLA+ candidate vars
|
// init TLA+ candidate vars
|
||||||
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
|
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
|
||||||
if (pSyncNode->pVotesGranted == NULL) {
|
if (pSyncNode->pVotesGranted == NULL) {
|
||||||
sError("failed to create VotesGranted. vgId:%d", pSyncNode->vgId);
|
sError("vgId:%d, failed to create VotesGranted", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
|
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
|
||||||
if (pSyncNode->pVotesRespond == NULL) {
|
if (pSyncNode->pVotesRespond == NULL) {
|
||||||
sError("failed to create VotesRespond. vgId:%d", pSyncNode->vgId);
|
sError("vgId:%d, failed to create VotesRespond", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init TLA+ leader vars
|
// init TLA+ leader vars
|
||||||
pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
|
pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode);
|
||||||
if (pSyncNode->pNextIndex == NULL) {
|
if (pSyncNode->pNextIndex == NULL) {
|
||||||
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
|
sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
|
pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode);
|
||||||
if (pSyncNode->pMatchIndex == NULL) {
|
if (pSyncNode->pMatchIndex == NULL) {
|
||||||
sError("failed to create SyncIndexMgr. vgId:%d", pSyncNode->vgId);
|
sError("vgId:%d, failed to create SyncIndexMgr", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init TLA+ log vars
|
// init TLA+ log vars
|
||||||
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
|
pSyncNode->pLogStore = logStoreCreate(pSyncNode);
|
||||||
if (pSyncNode->pLogStore == NULL) {
|
if (pSyncNode->pLogStore == NULL) {
|
||||||
sError("failed to create SyncLogStore. vgId:%d", pSyncNode->vgId);
|
sError("vgId:%d, failed to create SyncLogStore", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1273,7 +1278,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
SSnapshot snapshot = {0};
|
SSnapshot snapshot = {0};
|
||||||
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("failed to get snapshot info. vgId:%d, code:%d", pSyncNode->vgId, code);
|
sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
if (snapshot.lastApplyIndex > commitIndex) {
|
if (snapshot.lastApplyIndex > commitIndex) {
|
||||||
|
@ -1332,7 +1337,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
// tools
|
// tools
|
||||||
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
|
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
|
||||||
if (pSyncNode->pSyncRespMgr == NULL) {
|
if (pSyncNode->pSyncRespMgr == NULL) {
|
||||||
sError("failed to create SyncRespMgr. vgId:%d", pSyncNode->vgId);
|
sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2854,7 +2859,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
//syncNodeEventLog(pSyncNode, "eq peer hb timer");
|
// syncNodeEventLog(pSyncNode, "eq peer hb timer");
|
||||||
|
|
||||||
int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
|
int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
|
||||||
int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
|
int64_t msgLogicClock = atomic_load_64(&pData->logicClock);
|
||||||
|
|
|
@ -355,8 +355,6 @@ char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
|
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
|
||||||
ASSERT(pCfg != NULL);
|
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
|
TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
int32_t err = terrno;
|
int32_t err = terrno;
|
||||||
|
|
|
@ -129,7 +129,7 @@ endi
|
||||||
|
|
||||||
print ============= step5: result
|
print ============= step5: result
|
||||||
$i = 0
|
$i = 0
|
||||||
while $i < 1000
|
while $i < 10
|
||||||
$i = $i + 1
|
$i = $i + 1
|
||||||
|
|
||||||
sleep 1000
|
sleep 1000
|
||||||
|
|
Loading…
Reference in New Issue