Merge pull request #17592 from taosdata/enh/TD-19461
fix: alter replica one by one
This commit is contained in:
commit
b115863693
|
@ -204,13 +204,21 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
taosInitRWLatch(&pMgmt->lock);
|
||||
pMgmt->transId = 0;
|
||||
|
||||
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||
syncInfo.pWal = pMnode->pWal;
|
||||
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
||||
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
|
||||
SSyncInfo syncInfo = {
|
||||
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
|
||||
.batchSize = 1,
|
||||
.vgId = 1,
|
||||
.pWal = pMnode->pWal,
|
||||
.msgcb = NULL,
|
||||
.FpSendMsg = mndSyncSendMsg,
|
||||
.FpEqMsg = mndSyncEqMsg,
|
||||
.FpEqCtrlMsg = NULL,
|
||||
};
|
||||
|
||||
mInfo("vgId:1, start to open sync, selfIndex:%d replica:%d", pMgmt->selfIndex, pMgmt->numOfReplicas);
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
||||
|
||||
mInfo("vgId:1, start to open sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex);
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->replicaNum = pMgmt->numOfReplicas;
|
||||
pCfg->myIndex = pMgmt->selfIndex;
|
||||
|
|
|
@ -1600,6 +1600,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
|||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
|
||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
|
||||
return -1;
|
||||
|
|
|
@ -107,7 +107,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
|
|||
// free info binary
|
||||
taosMemoryFree(data);
|
||||
|
||||
vInfo("vgId:%d, vnode info is saved, fname:%s", pInfo->config.vgId, fname);
|
||||
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname, pInfo->config.syncCfg.replicaNum);
|
||||
|
||||
return 0;
|
||||
|
||||
|
|
|
@ -87,12 +87,19 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
|
|||
vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort);
|
||||
}
|
||||
|
||||
info.config.syncCfg = *pCfg;
|
||||
ret = vnodeSaveInfo(dir, &info);
|
||||
if (ret < 0) {
|
||||
vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = vnodeCommitInfo(dir, &info);
|
||||
if (ret < 0) {
|
||||
vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
vInfo("vgId:%d, vnode config is saved", info.config.vgId);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -563,9 +563,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
|
|||
#endif
|
||||
}
|
||||
|
||||
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
}
|
||||
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {}
|
||||
|
||||
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
|
||||
SVnode *pVnode = pFsm->data;
|
||||
|
@ -605,12 +603,14 @@ static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
|||
SVnode *pVnode = pFsm->data;
|
||||
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
||||
|
||||
// taosThreadMutexLock(&pVnode->lock);
|
||||
// if (pVnode->blocked) {
|
||||
// pVnode->blocked = false;
|
||||
// tsem_post(&pVnode->syncSem);
|
||||
// }
|
||||
// taosThreadMutexUnlock(&pVnode->lock);
|
||||
#if 0
|
||||
taosThreadMutexLock(&pVnode->lock);
|
||||
if (pVnode->blocked) {
|
||||
pVnode->blocked = false;
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
taosThreadMutexUnlock(&pVnode->lock);
|
||||
#endif
|
||||
}
|
||||
|
||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||
|
@ -638,10 +638,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||
SSyncInfo syncInfo = {
|
||||
.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
|
||||
//.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
|
||||
.batchSize = 1,
|
||||
.vgId = pVnode->config.vgId,
|
||||
.isStandBy = pVnode->config.standby,
|
||||
.syncCfg = pVnode->config.syncCfg,
|
||||
.pWal = pVnode->pWal,
|
||||
.msgcb = NULL,
|
||||
|
@ -653,6 +651,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
|||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
|
||||
syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
|
||||
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
vInfo("vgId:%d, start to open sync, replica:%d selfIndex:%d", pVnode->config.vgId, pCfg->replicaNum, pCfg->myIndex);
|
||||
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
|
||||
SNodeInfo *pNode = &pCfg->nodeInfo[i];
|
||||
vInfo("vgId:%d, index:%d ep:%s:%u", pVnode->config.vgId, i, pNode->nodeFqdn, pNode->nodePort);
|
||||
}
|
||||
|
||||
pVnode->sync = syncOpen(&syncInfo);
|
||||
if (pVnode->sync <= 0) {
|
||||
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
|
||||
|
@ -666,11 +671,15 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
|||
}
|
||||
|
||||
void vnodeSyncStart(SVnode *pVnode) {
|
||||
vDebug("vgId:%d, start sync", pVnode->config.vgId);
|
||||
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
||||
syncStart(pVnode->sync);
|
||||
}
|
||||
|
||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||
void vnodeSyncClose(SVnode *pVnode) {
|
||||
vDebug("vgId:%d, close sync", pVnode->config.vgId);
|
||||
syncStop(pVnode->sync);
|
||||
}
|
||||
|
||||
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ void syncCleanUp() {
|
|||
int64_t syncOpen(SSyncInfo* pSyncInfo) {
|
||||
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
|
||||
if (pSyncNode == NULL) {
|
||||
sError("failed to open sync node. vgId:%d", pSyncInfo->vgId);
|
||||
sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -106,7 +106,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
|
|||
}
|
||||
|
||||
void syncStart(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ void syncStart(int64_t rid) {
|
|||
}
|
||||
|
||||
void syncStartNormal(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ void syncStartNormal(int64_t rid) {
|
|||
}
|
||||
|
||||
void syncStartStandBy(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ void syncStartStandBy(int64_t rid) {
|
|||
}
|
||||
|
||||
void syncStop(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) return;
|
||||
int32_t vgId = pSyncNode->vgId;
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
|
@ -151,7 +151,7 @@ void syncStop(int64_t rid) {
|
|||
}
|
||||
|
||||
int32_t syncSetStandby(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("failed to set standby since accquire ref error, rid:%" PRId64, rid);
|
||||
|
@ -1089,17 +1089,13 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
// open/close --------------
|
||||
SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
|
||||
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
|
||||
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryCalloc(1, sizeof(SSyncNode));
|
||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
int32_t ret = 0;
|
||||
if (!taosDirExist((char*)(pSyncInfo->path))) {
|
||||
if (taosMkDir(pSyncInfo->path) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -1111,32 +1107,44 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
|
|||
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
|
||||
if (!taosCheckExistFile(pSyncNode->configPath)) {
|
||||
// create a new raft config file
|
||||
SRaftCfgMeta meta;
|
||||
SRaftCfgMeta meta = {0};
|
||||
meta.isStandBy = pSyncInfo->isStandBy;
|
||||
meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
|
||||
meta.lastConfigIndex = SYNC_INDEX_INVALID;
|
||||
meta.batchSize = pSyncInfo->batchSize;
|
||||
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
|
||||
if (ret != 0) {
|
||||
sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath);
|
||||
if (raftCfgCreateFile(&pSyncInfo->syncCfg, meta, pSyncNode->configPath) != 0) {
|
||||
sError("vgId:%d, failed to create raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
|
||||
goto _error;
|
||||
}
|
||||
if (pSyncInfo->syncCfg.replicaNum == 0) {
|
||||
sInfo("vgId:%d, sync config not input", pSyncNode->vgId);
|
||||
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
|
||||
}
|
||||
} else {
|
||||
// update syncCfg by raft_config.json
|
||||
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
|
||||
if (pSyncNode->pRaftCfg == NULL) {
|
||||
sError("failed to open raft cfg file. path:%s", pSyncNode->configPath);
|
||||
sError("vgId:%d, failed to open raft cfg file at %s", pSyncNode->vgId, pSyncNode->configPath);
|
||||
goto _error;
|
||||
}
|
||||
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
|
||||
if (pSyncInfo->syncCfg.replicaNum > 0 && pSyncInfo->syncCfg.replicaNum != pSyncNode->pRaftCfg->cfg.replicaNum) {
|
||||
sInfo("vgId:%d, use sync config from input options", pSyncNode->vgId);
|
||||
} else {
|
||||
sInfo("vgId:%d, use sync config from raft cfg file", pSyncNode->vgId);
|
||||
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
|
||||
}
|
||||
|
||||
raftCfgClose(pSyncNode->pRaftCfg);
|
||||
pSyncNode->pRaftCfg = NULL;
|
||||
}
|
||||
|
||||
SSyncCfg* pCfg = &pSyncInfo->syncCfg;
|
||||
sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
|
||||
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
|
||||
SNodeInfo* pNode = &pCfg->nodeInfo[i];
|
||||
sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
|
||||
}
|
||||
|
||||
// init by SSyncInfo
|
||||
pSyncNode->vgId = pSyncInfo->vgId;
|
||||
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
|
||||
|
|
Loading…
Reference in New Issue