refactor(sync): update replica index in snapshot sender
This commit is contained in:
parent
9cd54c5cc9
commit
5268b5233a
|
@ -561,7 +561,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
|
||||||
stub.createTime = taosGetTimestampMs();
|
stub.createTime = taosGetTimestampMs();
|
||||||
stub.rpcMsg = *pMsg;
|
stub.rpcMsg = *pMsg;
|
||||||
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
||||||
sDebug("vgId:%d, sync event propose, type:%s seq:%" PRIu64 " handle:%p", pSyncNode->vgId, TMSG_INFO(pMsg->msgType),
|
sDebug("vgId:%d sync event propose, type:%s seq:%" PRIu64 " handle:%p", pSyncNode->vgId, TMSG_INFO(pMsg->msgType),
|
||||||
seqNum, pMsg->info.handle);
|
seqNum, pMsg->info.handle);
|
||||||
|
|
||||||
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
|
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
|
||||||
|
@ -1244,7 +1244,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
|
||||||
SRaftId oldReplicasId[TSDB_MAX_REPLICA];
|
SRaftId oldReplicasId[TSDB_MAX_REPLICA];
|
||||||
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
|
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
|
||||||
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
|
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
|
||||||
memcpy(oldSenders, pSyncNode->senders, sizeof(oldSenders));
|
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
oldSenders[i] = (pSyncNode->senders)[i];
|
||||||
|
sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]);
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// init internal
|
// init internal
|
||||||
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
||||||
|
@ -1286,22 +1292,24 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
|
||||||
// reset new
|
// reset new
|
||||||
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||||
// reset sender
|
// reset sender
|
||||||
|
bool reset = false;
|
||||||
for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
|
for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
|
||||||
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
|
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
|
||||||
char host[128];
|
char host[128];
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
|
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
|
||||||
sDebug("vgId:%d sync event reset sender for %lu, %s:%d", pSyncNode->vgId, (pSyncNode->replicasId)[i].addr, host,
|
sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId,
|
||||||
port);
|
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
|
||||||
(pSyncNode->senders)[i] = oldSenders[j];
|
(pSyncNode->senders)[i] = oldSenders[j];
|
||||||
oldSenders[j] = NULL;
|
oldSenders[j] = NULL;
|
||||||
}
|
reset = true;
|
||||||
|
|
||||||
// reset replicaIndex
|
// reset replicaIndex
|
||||||
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
|
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
|
||||||
(pSyncNode->senders)[i]->replicaIndex = i;
|
(pSyncNode->senders)[i]->replicaIndex = i;
|
||||||
|
sDebug("vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", pSyncNode->vgId,
|
||||||
sDebug("vgId:%d sync event udpate replicaIndex from %d to %d", pSyncNode->vgId, oldreplicaIndex, i);
|
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1309,7 +1317,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
|
||||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
if ((pSyncNode->senders)[i] == NULL) {
|
if ((pSyncNode->senders)[i] == NULL) {
|
||||||
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
|
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
|
||||||
sDebug("vgId:%d sync event create new sender replicaIndex:%d", pSyncNode->vgId, i);
|
sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1317,8 +1325,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
|
||||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
if (oldSenders[i] != NULL) {
|
if (oldSenders[i] != NULL) {
|
||||||
snapshotSenderDestroy(oldSenders[i]);
|
snapshotSenderDestroy(oldSenders[i]);
|
||||||
|
sDebug("vgId:%d sync event delete old sender %p replicaIndex:%d", pSyncNode->vgId, oldSenders[i], i);
|
||||||
oldSenders[i] = NULL;
|
oldSenders[i] = NULL;
|
||||||
sDebug("vgId:%d sync event delete old sender replicaIndex:%d", pSyncNode->vgId, i);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1378,8 +1386,8 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
|
sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId,
|
||||||
debugStr);
|
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
|
||||||
|
|
||||||
// maybe clear leader cache
|
// maybe clear leader cache
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
@ -1413,8 +1421,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
|
||||||
//
|
//
|
||||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
|
sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId,
|
||||||
debugStr);
|
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
|
||||||
|
|
||||||
// state change
|
// state change
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||||
|
@ -2028,14 +2036,18 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
||||||
|
|
||||||
// change isStandBy to normal
|
// change isStandBy to normal
|
||||||
if (!isDrop) {
|
if (!isDrop) {
|
||||||
|
char tmpbuf[128];
|
||||||
|
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
|
||||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
syncNodeBecomeLeader(ths, "config change");
|
syncNodeBecomeLeader(ths, tmpbuf);
|
||||||
} else {
|
} else {
|
||||||
syncNodeBecomeFollower(ths, "config change");
|
syncNodeBecomeFollower(ths, tmpbuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
syncNodeBecomeFollower(ths, "config change2");
|
char tmpbuf[128];
|
||||||
|
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
|
||||||
|
syncNodeBecomeFollower(ths, tmpbuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
|
|
|
@ -588,6 +588,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
|
|
||||||
// maybe update lastconfig
|
// maybe update lastconfig
|
||||||
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
|
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
|
||||||
|
int32_t oldReplicaNum = pSyncNode->replicaNum;
|
||||||
|
|
||||||
// update new config myIndex
|
// update new config myIndex
|
||||||
bool IamInNew = false;
|
bool IamInNew = false;
|
||||||
SSyncCfg newSyncCfg = pMsg->lastConfig;
|
SSyncCfg newSyncCfg = pMsg->lastConfig;
|
||||||
|
@ -614,10 +616,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
|
|
||||||
// change isStandBy to normal
|
// change isStandBy to normal
|
||||||
if (!isDrop) {
|
if (!isDrop) {
|
||||||
|
char tmpbuf[128];
|
||||||
|
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum);
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
syncNodeBecomeLeader(pSyncNode, "config change3");
|
syncNodeBecomeLeader(pSyncNode, tmpbuf);
|
||||||
} else {
|
} else {
|
||||||
syncNodeBecomeFollower(pSyncNode, "config change3");
|
syncNodeBecomeFollower(pSyncNode, tmpbuf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue