refactor(sync): add interface: get retry epset
This commit is contained in:
parent
8a8e42a923
commit
87defc2790
|
@ -200,6 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid);
|
|||
SyncTerm syncGetMyTerm(int64_t rid);
|
||||
SyncGroupId syncGetVgId(int64_t rid);
|
||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||
bool syncEnvIsStart();
|
||||
const char* syncStr(ESyncState state);
|
||||
|
|
|
@ -144,11 +144,15 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
SEpSet newEpSet = {0};
|
||||
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
||||
|
||||
/*
|
||||
syncGetEpSet(pVnode->sync, &newEpSet);
|
||||
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
|
||||
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
|
||||
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
|
||||
}
|
||||
*/
|
||||
|
||||
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
||||
newEpSet.inUse);
|
||||
|
|
|
@ -489,12 +489,30 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
|||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||
(pEpSet->numOfEps)++;
|
||||
|
||||
sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
sInfo("vgId:%d sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
}
|
||||
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
|
||||
sInfo("vgId:%d sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
||||
|
||||
sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
}
|
||||
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
memset(pEpSet, 0, sizeof(*pEpSet));
|
||||
return;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
pEpSet->numOfEps = 0;
|
||||
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||
(pEpSet->numOfEps)++;
|
||||
sInfo("vgId:%d sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||
}
|
||||
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
||||
sInfo("vgId:%d sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue