refactor retry

This commit is contained in:
Yihao Deng 2024-01-19 06:20:16 +00:00
parent 9d82fd8c78
commit 6c732a14a3
2 changed files with 260 additions and 268 deletions

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndMnode.h" #include "mndMnode.h"
#include "audit.h"
#include "mndCluster.h" #include "mndCluster.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
@ -22,7 +23,6 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "tmisce.h" #include "tmisce.h"
#include "audit.h"
#define MNODE_VER_NUMBER 2 #define MNODE_VER_NUMBER 2
#define MNODE_RESERVE_SIZE 64 #define MNODE_RESERVE_SIZE 64
@ -168,7 +168,7 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER) SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
if(sver >=2){ if (sver >= 2) {
SDB_GET_INT32(pRaw, dataPos, &pObj->role, _OVER) SDB_GET_INT32(pRaw, dataPos, &pObj->role, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->lastIndex, _OVER) SDB_GET_INT64(pRaw, dataPos, &pObj->lastIndex, _OVER)
} }
@ -241,6 +241,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
} }
void *pIter = NULL; void *pIter = NULL;
pEpSet->inUse = 0;
while (1) { while (1) {
SMnodeObj *pObj = NULL; SMnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
@ -250,7 +251,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
if (mndIsLeader(pMnode)) { if (mndIsLeader(pMnode)) {
pEpSet->inUse = pEpSet->numOfEps; pEpSet->inUse = pEpSet->numOfEps;
} else { } else {
pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes; // pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
} }
} }
if (pObj->pDnode != NULL) { if (pObj->pDnode != NULL) {
@ -320,8 +321,8 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
return 0; return 0;
} }
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq,
SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) { SEpSet *pAlterMnodeTypeEpSet) {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq); tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
@ -396,13 +397,12 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){ if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
createReq.replicas[numOfReplicas].id = pMObj->id; createReq.replicas[numOfReplicas].id = pMObj->id;
createReq.replicas[numOfReplicas].port = pMObj->pDnode->port; createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++; numOfReplicas++;
} } else {
else{
createReq.learnerReplicas[numOfLearnerReplicas].id = pMObj->id; createReq.learnerReplicas[numOfLearnerReplicas].id = pMObj->id;
createReq.learnerReplicas[numOfLearnerReplicas].port = pMObj->pDnode->port; createReq.learnerReplicas[numOfLearnerReplicas].port = pMObj->pDnode->port;
memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
@ -441,18 +441,17 @@ int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if(pMObj->id == pDnode->id) { if (pMObj->id == pDnode->id) {
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
continue; continue;
} }
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){ if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
createReq.replicas[createReq.replica].id = pMObj->id; createReq.replicas[createReq.replica].id = pMObj->id;
createReq.replicas[createReq.replica].port = pMObj->pDnode->port; createReq.replicas[createReq.replica].port = pMObj->pDnode->port;
memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
createReq.replica++; createReq.replica++;
} } else {
else{
createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id; createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id;
createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port; createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port;
memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
@ -490,13 +489,12 @@ static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){ if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
alterReq.replicas[alterReq.replica].id = pMObj->id; alterReq.replicas[alterReq.replica].id = pMObj->id;
alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port; alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
alterReq.replica++; alterReq.replica++;
} } else {
else{
alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id; alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port; alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
@ -534,18 +532,17 @@ int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if(pMObj->id == pDnode->id) { if (pMObj->id == pDnode->id) {
sdbRelease(pSdb, pMObj); sdbRelease(pSdb, pMObj);
continue; continue;
} }
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){ if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
alterReq.replicas[alterReq.replica].id = pMObj->id; alterReq.replicas[alterReq.replica].id = pMObj->id;
alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port; alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
alterReq.replica++; alterReq.replica++;
} } else {
else{
alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id; alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port; alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
@ -959,7 +956,10 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
void *pIter = NULL; void *pIter = NULL;
int32_t updatingMnodes = 0; int32_t updatingMnodes = 0;
int32_t readyMnodes = 0; int32_t readyMnodes = 0;
SSyncCfg cfg = {.myIndex = -1, .lastIndex = 0,}; SSyncCfg cfg = {
.myIndex = -1,
.lastIndex = 0,
};
SyncIndex maxIndex = 0; SyncIndex maxIndex = 0;
while (1) { while (1) {
@ -986,17 +986,17 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
if (pObj->pDnode->id == pMnode->selfDnodeId) { if (pObj->pDnode->id == pMnode->selfDnodeId) {
cfg.myIndex = cfg.totalReplicaNum; cfg.myIndex = cfg.totalReplicaNum;
} }
if(pNode->nodeRole == TAOS_SYNC_ROLE_VOTER){ if (pNode->nodeRole == TAOS_SYNC_ROLE_VOTER) {
cfg.replicaNum++; cfg.replicaNum++;
} }
cfg.totalReplicaNum++; cfg.totalReplicaNum++;
if(pObj->lastIndex > cfg.lastIndex){ if (pObj->lastIndex > cfg.lastIndex) {
cfg.lastIndex = pObj->lastIndex; cfg.lastIndex = pObj->lastIndex;
} }
} }
if (objStatus == SDB_STATUS_DROPPING) { if (objStatus == SDB_STATUS_DROPPING) {
if(pObj->lastIndex > cfg.lastIndex){ if (pObj->lastIndex > cfg.lastIndex) {
cfg.lastIndex = pObj->lastIndex; cfg.lastIndex = pObj->lastIndex;
} }
} }
@ -1006,10 +1006,10 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
sdbReleaseLock(pSdb, pObj, false); sdbReleaseLock(pSdb, pObj, false);
} }
//if (readyMnodes <= 0 || updatingMnodes <= 0) { // if (readyMnodes <= 0 || updatingMnodes <= 0) {
// mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes); // mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
// return; // return;
//} // }
if (cfg.myIndex == -1) { if (cfg.myIndex == -1) {
#if 1 #if 1
@ -1023,8 +1023,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
} }
if (pMnode->syncMgmt.sync > 0) { if (pMnode->syncMgmt.sync > 0) {
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", cfg.totalReplicaNum, cfg.replicaNum,
cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex); cfg.myIndex);
for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) { for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i]; SNodeInfo *pNode = &cfg.nodeInfo[i];

View File

@ -106,7 +106,7 @@ _err:
return -1; return -1;
} }
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg){ int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
@ -589,6 +589,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
} }
if (pEpSet->numOfEps > 0) { if (pEpSet->numOfEps > 0) {
pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps; pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
pEpSet->inUse = 0;
} }
sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse); sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
@ -614,7 +615,7 @@ int32_t syncCheckMember(int64_t rid) {
return -1; return -1;
} }
if(pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){ if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
return -1; return -1;
} }
@ -689,17 +690,17 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
pMsg->info.conn.applyIndex = retIndex; pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode); pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
//after raft member change, need to handle 1->2 switching point // after raft member change, need to handle 1->2 switching point
//at this point, need to switch entry handling thread // at this point, need to switch entry handling thread
if(pSyncNode->replicaNum == 1){ if (pSyncNode->replicaNum == 1) {
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
return 1; return 1;
} } else {
else{ sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64 " type:%s, " " type:%s, "
"handle:%p", pSyncNode->vgId, retIndex, "handle:%p",
TMSG_INFO(pMsg->msgType), pMsg->info.handle); pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
return 0; return 0;
} }
} else { } else {
@ -844,7 +845,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
goto _error; goto _error;
} }
if(vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion){ if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) { if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId); sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg; pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
@ -856,14 +857,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId); sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg; pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
} }
} } else {
else{ sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncInfo->syncCfg.changeVersion);
pSyncNode->vgId, vnodeVersion, pSyncInfo->syncCfg.changeVersion);
} }
} }
// init by SSyncInfo // init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg; SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
@ -879,7 +878,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
pNode->nodeId, pNode->clusterId); pNode->nodeId, pNode->clusterId);
} }
if(vnodeVersion > pSyncInfo->syncCfg.changeVersion){ if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
if (updated) { if (updated) {
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId); sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
if (syncWriteCfgFile(pSyncNode) != 0) { if (syncWriteCfgFile(pSyncNode) != 0) {
@ -2335,47 +2334,49 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
return code; return code;
} }
void syncBuildConfigFromReq(SAlterVnodeReplicaReq *pReq, SSyncCfg *cfg){//TODO SAlterVnodeReplicaReq name is proper? void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper?
cfg->replicaNum = 0; cfg->replicaNum = 0;
cfg->totalReplicaNum = 0; cfg->totalReplicaNum = 0;
for (int i = 0; i < pReq->replica; ++i) { for (int i = 0; i < pReq->replica; ++i) {
SNodeInfo *pNode = &cfg->nodeInfo[i]; SNodeInfo* pNode = &cfg->nodeInfo[i];
pNode->nodeId = pReq->replicas[i].id; pNode->nodeId = pReq->replicas[i].id;
pNode->nodePort = pReq->replicas[i].port; pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole); sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->nodeRole);
cfg->replicaNum++; cfg->replicaNum++;
} }
if(pReq->selfIndex != -1){ if (pReq->selfIndex != -1) {
cfg->myIndex = pReq->selfIndex; cfg->myIndex = pReq->selfIndex;
} }
for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) { for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
SNodeInfo *pNode = &cfg->nodeInfo[i]; SNodeInfo* pNode = &cfg->nodeInfo[i];
pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id; pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port; pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER; pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn)); tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole); sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->nodeRole);
cfg->totalReplicaNum++; cfg->totalReplicaNum++;
} }
cfg->totalReplicaNum += pReq->replica; cfg->totalReplicaNum += pReq->replica;
if(pReq->learnerSelfIndex != -1){ if (pReq->learnerSelfIndex != -1) {
cfg->myIndex = pReq->replica + pReq->learnerSelfIndex; cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
} }
cfg->changeVersion = pReq->changeVersion; cfg->changeVersion = pReq->changeVersion;
} }
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){ if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
return -1; return -1;
} }
SMsgHead *head = (SMsgHead *)pEntry->data; SMsgHead* head = (SMsgHead*)pEntry->data;
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead)); void* pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
SAlterVnodeTypeReq req = {0}; SAlterVnodeTypeReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) { if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
@ -2386,17 +2387,17 @@ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){
SSyncCfg cfg = {0}; SSyncCfg cfg = {0};
syncBuildConfigFromReq(&req, &cfg); syncBuildConfigFromReq(&req, &cfg);
if(cfg.totalReplicaNum >= 1 && ths->state == TAOS_SYNC_STATE_LEADER){ if (cfg.totalReplicaNum >= 1 && ths->state == TAOS_SYNC_STATE_LEADER) {
bool incfg = false; bool incfg = false;
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
incfg = true; incfg = true;
break; break;
} }
} }
if(!incfg){ if (!incfg) {
SyncTerm currentTerm = raftStoreGetTerm(ths); SyncTerm currentTerm = raftStoreGetTerm(ths);
syncNodeStepDown(ths, currentTerm); syncNodeStepDown(ths, currentTerm);
return 1; return 1;
@ -2405,25 +2406,24 @@ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){
return 0; return 0;
} }
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d, " sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
", changeVersion:%d, "
"restoreFinish:%d", "restoreFinish:%d",
ths->vgId, str, ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
ths->restoreFinish); ths->restoreFinish);
sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
ths->vgId, str, ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
ths->myNodeInfo.nodePort, ths->myNodeInfo.nodeRole); ths->myNodeInfo.nodeRole);
for (int32_t i = 0; i < ths->peersNum; ++i){ for (int32_t i = 0; i < ths->peersNum; ++i) {
sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
ths->vgId, str, i, ths->peersNodeInfo[i].clusterId, i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole); ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
} }
for (int32_t i = 0; i < ths->peersNum; ++i){ for (int32_t i = 0; i < ths->peersNum; ++i) {
char buf[256]; char buf[256];
int32_t len = 256; int32_t len = 256;
int32_t n = 0; int32_t n = 0;
@ -2434,37 +2434,33 @@ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){
} }
n += snprintf(buf + n, len - n, "%s", "}"); n += snprintf(buf + n, len - n, "%s", "}");
sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
ths->vgId, str, i, buf, ths->peersEpset->inUse);
} }
for (int32_t i = 0; i < ths->peersNum; ++i){ for (int32_t i = 0; i < ths->peersNum; ++i) {
sInfo("vgId:%d, %s, peersId%d, addr:%"PRId64, sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
ths->vgId, str, i, ths->peersId[i].addr);
} }
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){ for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
ths->vgId, str, i, ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
ths->raftCfg.cfg.nodeInfo[i].nodeId, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
ths->raftCfg.cfg.nodeInfo[i].nodePort, ths->raftCfg.cfg.nodeInfo[i].nodeRole); ths->raftCfg.cfg.nodeInfo[i].nodeRole);
} }
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){ for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
ths->vgId, str, i, ths->replicasId[i].addr);
} }
} }
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
int32_t i = 0; int32_t i = 0;
//change peersNodeInfo // change peersNodeInfo
i = 0; i = 0;
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
if(!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){ ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole; ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId; ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN); tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
@ -2483,11 +2479,11 @@ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
} }
ths->peersNum = i; ths->peersNum = i;
//change cfg nodeInfo // change cfg nodeInfo
ths->raftCfg.cfg.replicaNum = 0; ths->raftCfg.cfg.replicaNum = 0;
i = 0; i = 0;
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j) { for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
ths->raftCfg.cfg.replicaNum++; ths->raftCfg.cfg.replicaNum++;
} }
ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole; ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
@ -2495,8 +2491,8 @@ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN); tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId; ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort; ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
if((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){ ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
ths->raftCfg.cfg.myIndex = i; ths->raftCfg.cfg.myIndex = i;
} }
i++; i++;
@ -2506,26 +2502,26 @@ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
return 0; return 0;
} }
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg *cfg){ void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
//change peersNodeInfo // change peersNodeInfo
for (int32_t i = 0; i < ths->peersNum; ++i) { for (int32_t i = 0; i < ths->peersNum; ++i) {
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
if(strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
&& ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){ ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER; ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
} }
} }
} }
} }
//change cfg nodeInfo // change cfg nodeInfo
ths->raftCfg.cfg.replicaNum = 0; ths->raftCfg.cfg.replicaNum = 0;
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
if(strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
&& ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){ ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER; ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
ths->raftCfg.cfg.replicaNum++; ths->raftCfg.cfg.replicaNum++;
} }
@ -2534,8 +2530,8 @@ void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg *cfg){
} }
} }
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum){ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
//1.rebuild replicasId, remove deleted one // 1.rebuild replicasId, remove deleted one
SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId)); memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
@ -2545,9 +2541,8 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]); syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]);
} }
// 2.rebuild MatchIndex, remove deleted one
//2.rebuild MatchIndex, remove deleted one SSyncIndexMgr* oldIndex = ths->pMatchIndex;
SSyncIndexMgr *oldIndex = ths->pMatchIndex;
ths->pMatchIndex = syncIndexMgrCreate(ths); ths->pMatchIndex = syncIndexMgrCreate(ths);
@ -2555,9 +2550,8 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
syncIndexMgrDestroy(oldIndex); syncIndexMgrDestroy(oldIndex);
// 3.rebuild NextIndex, remove deleted one
//3.rebuild NextIndex, remove deleted one SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
SSyncIndexMgr *oldNextIndex = ths->pNextIndex;
ths->pNextIndex = syncIndexMgrCreate(ths); ths->pNextIndex = syncIndexMgrCreate(ths);
@ -2565,16 +2559,14 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
syncIndexMgrDestroy(oldNextIndex); syncIndexMgrDestroy(oldNextIndex);
// 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
//4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
voteGrantedUpdate(ths->pVotesGranted, ths); voteGrantedUpdate(ths->pVotesGranted, ths);
votesRespondUpdate(ths->pVotesRespond, ths); votesRespondUpdate(ths->pVotesRespond, ths);
// 5.rebuild logReplMgr
//5.rebuild logReplMgr for (int i = 0; i < oldtotalReplicaNum; ++i) {
for(int i = 0; i < oldtotalReplicaNum; ++i){ sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId, i, i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex); ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
} }
@ -2584,15 +2576,15 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
if (NULL == oldLogReplMgrs) return -1; if (NULL == oldLogReplMgrs) return -1;
memset(oldLogReplMgrs, 0, length); memset(oldLogReplMgrs, 0, length);
for(int i = 0; i < oldtotalReplicaNum; i++){ for (int i = 0; i < oldtotalReplicaNum; i++) {
oldLogReplMgrs[i] = *(ths->logReplMgrs[i]); oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
} }
syncNodeLogReplDestroy(ths); syncNodeLogReplDestroy(ths);
syncNodeLogReplInit(ths); syncNodeLogReplInit(ths);
for(int i = 0; i < ths->totalReplicaNum; ++i){ for (int i = 0; i < ths->totalReplicaNum; ++i) {
for(int j = 0; j < oldtotalReplicaNum; j++){ for (int j = 0; j < oldtotalReplicaNum; j++) {
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) { if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
*(ths->logReplMgrs[i]) = oldLogReplMgrs[j]; *(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
ths->logReplMgrs[i]->peerId = i; ths->logReplMgrs[i]->peerId = i;
@ -2600,16 +2592,16 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
} }
} }
for(int i = 0; i < ths->totalReplicaNum; ++i){ for (int i = 0; i < ths->totalReplicaNum; ++i) {
sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")" , ths->vgId, i, sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex, i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex); ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
} }
//6.rebuild sender // 6.rebuild sender
for(int i = 0; i < oldtotalReplicaNum; ++i){ for (int i = 0; i < oldtotalReplicaNum; ++i) {
sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime) ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
} }
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
@ -2633,13 +2625,12 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender); sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
} }
for(int i = 0; i < ths->totalReplicaNum; i++){ for (int i = 0; i < ths->totalReplicaNum; i++) {
sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime) ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
} }
// 7.rebuild synctimer
//7.rebuild synctimer
syncNodeStopHeartbeatTimer(ths); syncNodeStopHeartbeatTimer(ths);
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
@ -2648,16 +2639,15 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
syncNodeStartHeartbeatTimer(ths); syncNodeStartHeartbeatTimer(ths);
// 8.rebuild peerStates
//8.rebuild peerStates
SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
for(int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++){ for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
oldState[i] = ths->peerStates[i]; oldState[i] = ths->peerStates[i];
} }
for(int i = 0; i < ths->totalReplicaNum; i++){ for (int i = 0; i < ths->totalReplicaNum; i++) {
for(int j = 0; j < oldtotalReplicaNum; j++){ for (int j = 0; j < oldtotalReplicaNum; j++) {
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){ if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
ths->peerStates[i] = oldState[j]; ths->peerStates[i] = oldState[j];
} }
} }
@ -2668,32 +2658,32 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
return 0; return 0;
} }
void syncNodeChangeToVoter(SSyncNode* ths){ void syncNodeChangeToVoter(SSyncNode* ths) {
//replicasId, only need to change replicaNum when 1->3 // replicasId, only need to change replicaNum when 1->3
ths->replicaNum = ths->raftCfg.cfg.replicaNum; ths->replicaNum = ths->raftCfg.cfg.replicaNum;
sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum); sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
for (int32_t i = 0; i < ths->totalReplicaNum; ++i){ for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr); sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
} }
//pMatchIndex, pNextIndex, only need to change replicaNum when 1->3 // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum; ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum; ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum); sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i){ for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]); sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
} }
//pVotesGranted, pVotesRespond // pVotesGranted, pVotesRespond
voteGrantedUpdate(ths->pVotesGranted, ths); voteGrantedUpdate(ths->pVotesGranted, ths);
votesRespondUpdate(ths->pVotesRespond, ths); votesRespondUpdate(ths->pVotesRespond, ths);
//logRepMgrs // logRepMgrs
//no need to change logRepMgrs when 1->3 // no need to change logRepMgrs when 1->3
} }
void syncNodeResetPeerAndCfg(SSyncNode* ths){ void syncNodeResetPeerAndCfg(SSyncNode* ths) {
SNodeInfo node = {0}; SNodeInfo node = {0};
for (int32_t i = 0; i < ths->peersNum; ++i) { for (int32_t i = 0; i < ths->peersNum; ++i) {
memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo)); memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
@ -2704,13 +2694,13 @@ void syncNodeResetPeerAndCfg(SSyncNode* ths){
} }
} }
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){ if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
return -1; return -1;
} }
SMsgHead *head = (SMsgHead *)pEntry->data; SMsgHead* head = (SMsgHead*)pEntry->data;
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead)); void* pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
SAlterVnodeTypeReq req = {0}; SAlterVnodeTypeReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) { if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
@ -2721,139 +2711,141 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
SSyncCfg cfg = {0}; SSyncCfg cfg = {0};
syncBuildConfigFromReq(&req, &cfg); syncBuildConfigFromReq(&req, &cfg);
if(cfg.changeVersion <= ths->raftCfg.cfg.changeVersion){ if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
sInfo("vgId:%d, skip conf change entry since lower version. " sInfo(
"this entry, index:%" PRId64 ", term:%" PRId64 ", totalReplicaNum:%d, changeVersion:%d; " "vgId:%d, skip conf change entry since lower version. "
"current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64", changeVersion:%d", "this entry, index:%" PRId64 ", term:%" PRId64
ths->vgId, ", totalReplicaNum:%d, changeVersion:%d; "
pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion); ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
return 0; return 0;
} }
if(strcmp(str, "Commit") == 0){ if (strcmp(str, "Commit") == 0) {
sInfo("vgId:%d, change config from %s. " sInfo(
"this, i:%" PRId64 ", trNum:%d, vers:%d; " "vgId:%d, change config from %s. "
"this, i:%" PRId64
", trNum:%d, vers:%d; "
"node, rNum:%d, pNum:%d, trNum:%d, " "node, rNum:%d, pNum:%d, trNum:%d, "
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
"), "
"cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)", "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
ths->replicaNum, ths->peersNum, ths->totalReplicaNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } else {
} sInfo(
else{ "vgId:%d, change config from %s. "
sInfo("vgId:%d, change config from %s. " "this, i:%" PRId64 ", t:%" PRId64
"this, i:%" PRId64 ", t:%" PRId64 ", trNum:%d, vers:%d; " ", trNum:%d, vers:%d; "
"node, rNum:%d, pNum:%d, trNum:%d, " "node, rNum:%d, pNum:%d, trNum:%d, "
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
"), "
"cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")", "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
ths->replicaNum, ths->peersNum, ths->totalReplicaNum, ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
pEntry->index -1, ths->commitIndex, ths->pLogBuf->commitIndex); ths->pLogBuf->commitIndex);
} }
syncNodeLogConfigInfo(ths, &cfg, "before config change"); syncNodeLogConfigInfo(ths, &cfg, "before config change");
int32_t oldTotalReplicaNum = ths->totalReplicaNum; int32_t oldTotalReplicaNum = ths->totalReplicaNum;
if(cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2){//remove replica if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) { // remove replica
bool incfg = false; bool incfg = false;
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
incfg = true; incfg = true;
break; break;
} }
} }
if(incfg){//remove other if (incfg) { // remove other
syncNodeResetPeerAndCfg(ths); syncNodeResetPeerAndCfg(ths);
//no need to change myNodeInfo // no need to change myNodeInfo
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){ if (syncNodeRebuildPeerAndCfg(ths, &cfg) != 0) {
return -1; return -1;
}; };
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){ if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) {
return -1; return -1;
}; };
} } else { // remove myself
else{//remove myself // no need to do anything actually, to change the following to reduce distruptive server chance
//no need to do anything actually, to change the following to reduce distruptive server chance
syncNodeResetPeerAndCfg(ths); syncNodeResetPeerAndCfg(ths);
//change myNodeInfo // change myNodeInfo
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER; ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
//change peer and cfg // change peer and cfg
ths->peersNum = 0; ths->peersNum = 0;
memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo)); memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
ths->raftCfg.cfg.replicaNum = 0; ths->raftCfg.cfg.replicaNum = 0;
ths->raftCfg.cfg.totalReplicaNum = 1; ths->raftCfg.cfg.totalReplicaNum = 1;
//change other // change other
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){ if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) {
return -1; return -1;
} }
//change state // change state
ths->state = TAOS_SYNC_STATE_LEARNER; ths->state = TAOS_SYNC_STATE_LEARNER;
} }
ths->restoreFinish = false; ths->restoreFinish = false;
} } else { // add replica, or change replica type
else{//add replica, or change replica type if (ths->totalReplicaNum == 3) { // change replica type
if(ths->totalReplicaNum == 3){ //change replica type
sInfo("vgId:%d, begin change replica type", ths->vgId); sInfo("vgId:%d, begin change replica type", ths->vgId);
//change myNodeInfo // change myNodeInfo
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
if(cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER; ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
} }
} }
} }
//change peer and cfg // change peer and cfg
syncNodeChangePeerAndCfgToVoter(ths, &cfg); syncNodeChangePeerAndCfgToVoter(ths, &cfg);
//change other // change other
syncNodeChangeToVoter(ths); syncNodeChangeToVoter(ths);
//change state // change state
if(ths->state ==TAOS_SYNC_STATE_LEARNER){ if (ths->state == TAOS_SYNC_STATE_LEARNER) {
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER ){ if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
ths->state = TAOS_SYNC_STATE_FOLLOWER; ths->state = TAOS_SYNC_STATE_FOLLOWER;
} }
} }
ths->restoreFinish = false; ths->restoreFinish = false;
} } else { // add replica
else{//add replica
sInfo("vgId:%d, begin add replica", ths->vgId); sInfo("vgId:%d, begin add replica", ths->vgId);
//no need to change myNodeInfo // no need to change myNodeInfo
//change peer and cfg // change peer and cfg
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){ if (syncNodeRebuildPeerAndCfg(ths, &cfg) != 0) {
return -1; return -1;
}; };
//change other // change other
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){ if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) {
return -1; return -1;
}; };
//no need to change state // no need to change state
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){ if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
ths->restoreFinish = false; ths->restoreFinish = false;
} }
} }
@ -2867,7 +2859,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
syncNodeLogConfigInfo(ths, &cfg, "after config change"); syncNodeLogConfigInfo(ths, &cfg, "after config change");
if(syncWriteCfgFile(ths) != 0){ if (syncWriteCfgFile(ths) != 0) {
sError("vgId:%d, failed to create sync cfg file", ths->vgId); sError("vgId:%d, failed to create sync cfg file", ths->vgId);
return -1; return -1;
}; };
@ -2927,7 +2919,7 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
int32_t toCount = 0; int32_t toCount = 0;
int64_t tsNow = taosGetTimestampMs(); int64_t tsNow = taosGetTimestampMs();
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
if(pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER){ if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
continue; continue;
} }
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
@ -3191,9 +3183,9 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index); pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
} }
//1->2, config change is add in write thread, and will continue in sync thread // 1->2, config change is add in write thread, and will continue in sync thread
//need save message for it // need save message for it
if(pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE){ if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
uint64_t seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub); uint64_t seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
pEntry->seqNum = seqNum; pEntry->seqNum = seqNum;
@ -3209,16 +3201,16 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
(*pRetIndex) = index; (*pRetIndex) = index;
} }
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
int32_t code = syncNodeCheckChangeConfig(ths, pEntry); int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
if(code < 0){ if (code < 0) {
sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr()); sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
pEntry = NULL; pEntry = NULL;
return -1; return -1;
} }
if(code > 0){ if (code > 0) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
(void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info); (void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {