refact: refact function syncUtilNodeInfo2RaftId
This commit is contained in:
parent
1f1a2d9f6c
commit
6470e5f6e3
|
@ -486,7 +486,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
|
||||
int32_t code = 0;
|
||||
(void)tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
pMgmt->sync = syncOpen(&syncInfo, true);
|
||||
pMgmt->sync = syncOpen(&syncInfo, 1); // always check
|
||||
if (pMgmt->sync <= 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
mError("failed to open sync since %s", tstrerror(code));
|
||||
|
|
|
@ -1113,37 +1113,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
|||
goto _error;
|
||||
}
|
||||
|
||||
// init internal
|
||||
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
|
||||
if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pSyncNode->arbTerm = -1;
|
||||
(void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
|
||||
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
|
||||
sInfo("vgId:%d, arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
|
||||
|
||||
// init peersNum, peers, peersId
|
||||
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
|
||||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
if (i != pSyncNode->raftCfg.cfg.myIndex) {
|
||||
pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
|
||||
syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
|
||||
j++;
|
||||
}
|
||||
}
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
// init replicaNum, replicasId
|
||||
pSyncNode->replicaNum = pSyncNode->raftCfg.cfg.replicaNum;
|
||||
pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum;
|
||||
|
@ -1155,6 +1124,27 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
|||
}
|
||||
}
|
||||
|
||||
// init internal
|
||||
pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex];
|
||||
pSyncNode->myRaftId = pSyncNode->replicasId[pSyncNode->raftCfg.cfg.myIndex];
|
||||
|
||||
// init peersNum, peers, peersId
|
||||
pSyncNode->peersNum = pSyncNode->raftCfg.cfg.totalReplicaNum - 1;
|
||||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
if (i != pSyncNode->raftCfg.cfg.myIndex) {
|
||||
pSyncNode->peersNodeInfo[j] = pSyncNode->raftCfg.cfg.nodeInfo[i];
|
||||
pSyncNode->peersId[j] = pSyncNode->replicasId[i];
|
||||
syncUtilNodeInfo2EpSet(&pSyncNode->peersNodeInfo[j], &pSyncNode->peersEpset[j]);
|
||||
j++;
|
||||
}
|
||||
}
|
||||
|
||||
pSyncNode->arbTerm = -1;
|
||||
(void)taosThreadMutexInit(&pSyncNode->arbTokenMutex, NULL);
|
||||
syncUtilGenerateArbToken(pSyncNode->myNodeInfo.nodeId, pSyncInfo->vgId, pSyncNode->arbToken);
|
||||
sInfo("vgId:%d, generate arb token:%s", pSyncNode->vgId, pSyncNode->arbToken);
|
||||
|
||||
// init raft algorithm
|
||||
pSyncNode->pFsm = pSyncInfo->pFsm;
|
||||
pSyncInfo->pFsm = NULL;
|
||||
|
|
|
@ -43,13 +43,11 @@ void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
|
|||
|
||||
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
|
||||
uint32_t ipv4 = 0xFFFFFFFF;
|
||||
sDebug("vgId:%d, resolve sync addr from fqdn, dnode:%d cluster:%" PRId64 " fqdn:%s port:%u", vgId, pInfo->nodeId,
|
||||
pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
|
||||
sDebug("vgId:%d, resolve sync addr from fqdn, ep:%s:%u", vgId, pInfo->nodeFqdn, pInfo->nodePort);
|
||||
for (int32_t i = 0; i < tsResolveFQDNRetryTime; i++) {
|
||||
int32_t code = taosGetIpv4FromFqdn(pInfo->nodeFqdn, &ipv4);
|
||||
if (code) {
|
||||
sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s, wait one second", vgId, pInfo->nodeId,
|
||||
pInfo->nodeFqdn);
|
||||
sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s, retry", vgId, pInfo->nodeId, pInfo->nodeFqdn);
|
||||
taosSsleep(1);
|
||||
} else {
|
||||
break;
|
||||
|
@ -57,7 +55,7 @@ bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
|
|||
}
|
||||
|
||||
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
|
||||
sError("vgId:%d, failed to resolve sync addr, fqdn:%s", vgId, pInfo->nodeFqdn);
|
||||
sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s", vgId, pInfo->nodeId, pInfo->nodeFqdn);
|
||||
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
||||
return false;
|
||||
}
|
||||
|
@ -67,8 +65,8 @@ bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
|
|||
raftId->addr = SYNC_ADDR(pInfo);
|
||||
raftId->vgId = vgId;
|
||||
|
||||
sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ip:%s ipv4:%u", vgId,
|
||||
raftId->addr, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4);
|
||||
sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, ep:%s:%u ip:%s ipv4:%u dnode:%d cluster:%" PRId64, vgId,
|
||||
raftId->addr, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4, pInfo->nodeId, pInfo->clusterId);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue