Merge branch '3.0' into test3.0/lihui
This commit is contained in:
commit
b25442a2fc
|
@ -200,6 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid);
|
||||||
SyncTerm syncGetMyTerm(int64_t rid);
|
SyncTerm syncGetMyTerm(int64_t rid);
|
||||||
SyncGroupId syncGetVgId(int64_t rid);
|
SyncGroupId syncGetVgId(int64_t rid);
|
||||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
|
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||||
bool syncEnvIsStart();
|
bool syncEnvIsStart();
|
||||||
const char* syncStr(ESyncState state);
|
const char* syncStr(ESyncState state);
|
||||||
|
|
|
@ -198,6 +198,10 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// decrease election timer
|
||||||
|
setElectTimerMS(pMgmt->sync, 600);
|
||||||
|
setHeartbeatTimerMS(pMgmt->sync, 300);
|
||||||
|
|
||||||
mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
|
mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -261,7 +265,8 @@ bool mndIsMaster(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
|
||||||
if (!syncIsReady(pMgmt->sync)) {
|
if (!syncIsReady(pMgmt->sync)) {
|
||||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
// get terrno from syncIsReady
|
||||||
|
// terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -144,11 +144,15 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
||||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||||
SEpSet newEpSet = {0};
|
SEpSet newEpSet = {0};
|
||||||
|
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
||||||
|
|
||||||
|
/*
|
||||||
syncGetEpSet(pVnode->sync, &newEpSet);
|
syncGetEpSet(pVnode->sync, &newEpSet);
|
||||||
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
|
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
|
||||||
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
|
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
|
||||||
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
|
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
||||||
newEpSet.inUse);
|
newEpSet.inUse);
|
||||||
|
|
|
@ -105,9 +105,16 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
|
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
|
||||||
sTrace("syncNodeRequestVote pSyncNode:%p ", pSyncNode);
|
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
do {
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
||||||
|
sDebug("vgId:%d, send sync-request-vote to %s:%d, {term:%lu, last-index:%ld, last-term:%lu}", pSyncNode->vgId, host,
|
||||||
|
port, pMsg->term, pMsg->lastLogTerm, pMsg->lastLogIndex);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
|
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
|
||||||
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
||||||
|
|
|
@ -370,6 +370,15 @@ bool syncIsReady(int64_t rid) {
|
||||||
ASSERT(rid == pSyncNode->rid);
|
ASSERT(rid == pSyncNode->rid);
|
||||||
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
|
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
|
||||||
|
// if false, set error code
|
||||||
|
if (false == b) {
|
||||||
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
|
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
|
}
|
||||||
|
}
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,12 +489,30 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||||
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||||
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||||
(pEpSet->numOfEps)++;
|
(pEpSet->numOfEps)++;
|
||||||
|
sInfo("vgId:%d sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||||
sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
|
||||||
}
|
}
|
||||||
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
|
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
|
||||||
|
sInfo("vgId:%d sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
||||||
|
|
||||||
sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
memset(pEpSet, 0, sizeof(*pEpSet));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ASSERT(rid == pSyncNode->rid);
|
||||||
|
pEpSet->numOfEps = 0;
|
||||||
|
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
|
||||||
|
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
|
||||||
|
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
|
||||||
|
(pEpSet->numOfEps)++;
|
||||||
|
sInfo("vgId:%d sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
|
||||||
|
}
|
||||||
|
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
|
||||||
|
sInfo("vgId:%d sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,8 +224,8 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
||||||
sDebug(
|
sDebug(
|
||||||
"vgId:%d, send sync-append-entries to %s:%d, term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, commit:%ld, "
|
"vgId:%d, send sync-append-entries to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, commit:%ld, "
|
||||||
"datalen:%d",
|
"datalen:%d}",
|
||||||
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
|
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
|
||||||
pMsg->commitIndex, pMsg->dataLen);
|
pMsg->commitIndex, pMsg->dataLen);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
|
@ -99,15 +99,20 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
|
||||||
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// print log
|
|
||||||
char logBuf[128] = {0};
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVote, currentTerm:%lu", ths->pRaftStore->currentTerm);
|
|
||||||
syncRequestVoteLog2(logBuf, pMsg);
|
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||||
sInfo("recv SyncRequestVote maybe replica already dropped");
|
do {
|
||||||
return ret;
|
char logBuf[256];
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"recv sync-request-vote from %s:%d, term:%lu, lindex:%ld, lterm:%lu, maybe replica already dropped",
|
||||||
|
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
|
||||||
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybe update term
|
// maybe update term
|
||||||
|
@ -135,10 +140,22 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
|
||||||
pReply->term = ths->pRaftStore->currentTerm;
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
pReply->voteGranted = grant;
|
pReply->voteGranted = grant;
|
||||||
|
|
||||||
|
// trace log
|
||||||
|
do {
|
||||||
|
char logBuf[256];
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"recv sync-request-vote from %s:%d, term:%lu, lindex:%ld, lterm:%lu, reply-grant:%d", host, port,
|
||||||
|
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
|
||||||
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
|
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
|
||||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
syncRequestVoteReplyDestroy(pReply);
|
syncRequestVoteReplyDestroy(pReply);
|
||||||
|
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue