From 7d480cf3accea10b2784b04d8e945c0cb5f7e33e Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 21 Jul 2023 10:31:53 +0800 Subject: [PATCH] compatible old sync config --- include/common/tmsg.h | 3 +- include/libs/sync/sync.h | 2 +- source/common/src/tmsg.c | 6 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 16 +-- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 2 +- source/dnode/mnode/impl/src/mndDb.c | 2 +- source/dnode/mnode/impl/src/mndVgroup.c | 107 +++++++++++++++++--- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnd.h | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 11 +- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncMain.c | 43 ++++++-- source/libs/sync/src/syncRaftCfg.c | 3 +- 14 files changed, 162 insertions(+), 43 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4e0a5edad9..5dc5b7f514 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1410,7 +1410,8 @@ typedef struct { int32_t dstVgId; uint32_t hashBegin; uint32_t hashEnd; - int64_t reserved; + int32_t changeVersion; + int32_t reserved; } SAlterVnodeHashRangeReq; int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f28e75c68a..76b504ea99 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -245,7 +245,7 @@ typedef struct SSyncState { int32_t syncInit(); void syncCleanUp(); -int64_t syncOpen(SSyncInfo* pSyncInfo, bool isFirst); +int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); int32_t syncStart(int64_t rid); void syncStop(int64_t rid); void syncPreStop(int64_t rid); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1e66eee069..835bb5553b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4690,7 +4690,8 @@ int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnode if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1; if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1; - if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1; + if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1; + if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1; tEndEncode(&encoder); @@ -4708,7 +4709,8 @@ int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVno if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1; if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 6531cab68b..adceeb9ec3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -277,7 +277,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { goto _OVER; } - SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr()); code = terrno; @@ -367,8 +367,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name); int32_t vgId = req.vgId; - dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d", vgId, req.replica, req.selfIndex, - req.strict); + dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", + vgId, req.replica, req.selfIndex, req.strict, req.changeVersion); for (int32_t i = 0; i < req.replica; ++i) { SReplica *pReplica = &req.replicas[i]; dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id); @@ -425,7 +425,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } dInfo("vgId:%d, begin to open vnode", vgId); - SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); return -1; @@ -572,7 +572,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } dInfo("vgId:%d, open vnode", dstVgId); - SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true); + SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr()); @@ -612,9 +612,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vgId = alterReq.vgId; dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d " - "learnerSelfIndex:%d strict:%d", + "learnerSelfIndex:%d strict:%d changeVersion:%d", vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, - alterReq.learnerSelfIndex, alterReq.strict); + alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion); for (int32_t i = 0; i < alterReq.replica; ++i) { SReplica *pReplica = &alterReq.replicas[i]; dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); @@ -676,7 +676,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } dInfo("vgId:%d, begin to open vnode", vgId); - SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 02f27cec97..894b19abc7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -266,7 +266,7 @@ static void *vmOpenVnodeInThread(void *param) { int32_t diskPrimary = pCfg->diskPrimary; snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); - SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false); + SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a5bd521a3b..67892ec57c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -634,10 +634,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mndTransSetOper(pTrans, MND_OPER_CREATE_DB); if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; + if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; - if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 0d34e1e0aa..186927fdc6 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -274,6 +274,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.hashPrefix = pDb->cfg.hashPrefix; createReq.hashSuffix = pDb->cfg.hashSuffix; createReq.tsdbPageSize = pDb->cfg.tsdbPageSize; + createReq.changeVersion= ++(pVgroup->syncConfChangeVer); for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = NULL; @@ -322,9 +323,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.changeVersion = pVgroup->syncConfChangeVer; - mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d " + "changeVersion:%d", createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, - createReq.learnerReplica, createReq.strict); + createReq.learnerReplica, createReq.strict, createReq.changeVersion); for (int32_t i = 0; i < createReq.replica; ++i) { mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port); } @@ -402,7 +404,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p .learnerReplica = 0, .selfIndex = -1, .learnerSelfIndex = -1, - .changeVersion = pVgroup->syncConfChangeVer, + .changeVersion = ++(pVgroup->syncConfChangeVer), }; for (int32_t v = 0; v < pVgroup->replica; ++v) { @@ -438,9 +440,10 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p } } - mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d " + "changeVersion:%d", alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, - alterReq.learnerSelfIndex, alterReq.strict); + alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion); for (int32_t i = 0; i < alterReq.replica; ++i) { mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port); } @@ -471,6 +474,83 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p return pReq; } +static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, + int32_t *pContLen) { + SCheckLearnCatchupReq req = { + .vgId = pVgroup->vgId, + .strict = pDb->cfg.strict, + .replica = 0, + .learnerReplica = 0, + .selfIndex = -1, + .learnerSelfIndex = -1, + }; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + SReplica *pReplica = NULL; + + if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){ + pReplica = &req.replicas[req.replica]; + req.replica++; + } + else{ + pReplica = &req.learnerReplicas[req.learnerReplica]; + req.learnerReplica++; + } + + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pVgidDnode == NULL) return NULL; + + pReplica->id = pVgidDnode->id; + pReplica->port = pVgidDnode->port; + memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); + mndReleaseDnode(pMnode, pVgidDnode); + + if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){ + if (dnodeId == pVgid->dnodeId) { + req.selfIndex = v; + } + } + else{ + if (dnodeId == pVgid->dnodeId) { + req.learnerSelfIndex = v; + } + } + } + + mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + req.vgId, req.replica, req.selfIndex, req.learnerReplica, + req.learnerSelfIndex, req.strict); + for (int32_t i = 0; i < req.replica; ++i) { + mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port); + } + for (int32_t i = 0; i < req.learnerReplica; ++i) { + mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, + req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port); + } + + if (req.selfIndex == -1 && req.learnerSelfIndex == -1) { + terrno = TSDB_CODE_APP_ERROR; + return NULL; + } + + int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req); + *pContLen = contLen; + return pReq; +} + static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) { SDisableVnodeWriteReq disableReq = { .vgId = vgId, @@ -501,6 +581,7 @@ static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVg .dstVgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd, + .changeVersion = ++(pVgroup->syncConfChangeVer), }; mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId, @@ -1411,7 +1492,7 @@ int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj * mndReleaseDnode(pMnode, pDnode); int32_t contLen = 0; - void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen); + void *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; @@ -2358,7 +2439,7 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO mndTransSetSerial(pTrans); - mInfo("trans:%d, vgid:%d alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", + mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", pTrans->id, pVgroup->vgId, pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica); if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) { @@ -2374,11 +2455,15 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER; newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER; - newVgroup.syncConfChangeVer++; if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; - //if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; + mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", + pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica); if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1; + mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", + pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica); if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1; + mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", + pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica); //check learner @@ -2392,7 +2477,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER; newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER; - newVgroup.syncConfChangeVer++; if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; @@ -2401,7 +2485,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER; newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER; - newVgroup.syncConfChangeVer++; if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; @@ -2421,7 +2504,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO SVnodeGid del1 = {0}; if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1) != 0) return -1; - newVgroup.syncConfChangeVer++; if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; @@ -2439,7 +2521,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO SVnodeGid del2 = {0}; if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2) != 0) return -1; - newVgroup.syncConfChangeVer++; if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 5ff24c73f5..38216e1414 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -58,7 +58,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, int32_t diskPrimary, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); -SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool isFirst); +SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode); void vnodeSyncCheckTimeout(SVnode *pVnode); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index de1abf9416..a75d5d9307 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -109,7 +109,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c -int32_t vnodeSyncOpen(SVnode* pVnode, char* path, bool isFirst); +int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion); int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncPreClose(SVnode* pVnode); void vnodeSyncPostClose(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index af0eb2156f..db3b35867d 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -117,9 +117,10 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t if(pReq->learnerSelfIndex != -1){ pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; } + pCfg->changeVersion = pReq->changeVersion; - vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", - pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex); + vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", + pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion); info.config.syncCfg = *pCfg; ret = vnodeSaveInfo(dir, &info); @@ -216,6 +217,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod info.config.hashEnd = pReq->hashEnd; info.config.hashChange = true; info.config.walCfg.vgId = pReq->dstVgId; + info.config.syncCfg.changeVersion = pReq->changeVersion; SSyncCfg *pCfg = &info.config.syncCfg; pCfg->myIndex = 0; @@ -311,7 +313,7 @@ static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) { return 0; } -SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool isFirst) { +SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb) { SVnode *pVnode = NULL; SVnodeInfo info = {0}; char dir[TSDB_FILENAME_LEN] = {0}; @@ -439,7 +441,8 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC } // open sync - if (vnodeSyncOpen(pVnode, dir, isFirst)) { + vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion); + if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 468db28c39..d140c4a122 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -637,7 +637,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { return pFsm; } -int32_t vnodeSyncOpen(SVnode *pVnode, char *path, bool isFirst) { +int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) { SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST, .batchSize = 1, @@ -664,7 +664,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path, bool isFirst) { pNode->nodeId, pNode->clusterId); } - pVnode->sync = syncOpen(&syncInfo, isFirst); + pVnode->sync = syncOpen(&syncInfo, vnodeVersion); if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); return -1; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 7613fde169..e79e7a3f75 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -228,7 +228,7 @@ typedef struct SSyncNode { } SSyncNode; // open/close -------------- -SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst); +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); int32_t syncNodeStart(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 039c039c72..617b141fef 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -59,8 +59,8 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); -int64_t syncOpen(SSyncInfo* pSyncInfo, bool isFirst) { - SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, isFirst); +int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { + SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion); if (pSyncNode == NULL) { sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; @@ -778,7 +778,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { } // open/close -------------- -SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) { +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -798,7 +798,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) { TD_DIRSEP); snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP); - if (!taosCheckExistFile(pSyncNode->configPath) && isFirst) { + if (!taosCheckExistFile(pSyncNode->configPath)) { // create a new raft config file sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId); pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy; @@ -820,7 +820,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) { goto _error; } - if(isFirst){ + if(vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion){ if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) { sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId); pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg; @@ -833,6 +833,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) { pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg; } } + else{ + sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", + pSyncNode->vgId, vnodeVersion, pSyncInfo->syncCfg.changeVersion); + } } @@ -851,7 +855,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) { pNode->nodeId, pNode->clusterId); } - if(isFirst){ + if(vnodeVersion > pSyncInfo->syncCfg.changeVersion){ if (updated) { sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId); if (syncWriteCfgFile(pSyncNode) != 0) { @@ -2382,12 +2386,38 @@ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){ ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole); } + for (int32_t i = 0; i < ths->peersNum; ++i){ + char buf[256]; + int32_t len = 256; + int32_t n = 0; + n += snprintf(buf + n, len - n, "%s", "{"); + for (int i = 0; i < ths->peersEpset->numOfEps; i++) { + n += snprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port, + (i + 1 < ths->peersEpset->numOfEps ? ", " : "")); + } + n += snprintf(buf + n, len - n, "%s", "}"); + + sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", + ths->vgId, str, i, buf, ths->peersEpset->inUse); + } + + for (int32_t i = 0; i < ths->peersNum; ++i){ + sInfo("vgId:%d, %s, peersId%d, addr:%"PRId64, + ths->vgId, str, i, ths->peersId[i].addr); + } + for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){ sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i, ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort, ths->raftCfg.cfg.nodeInfo[i].nodeRole); } + + for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){ + sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, + ths->vgId, str, i, ths->replicasId[i].addr); + } + } int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){ @@ -2520,6 +2550,7 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum for(int j = 0; j < oldtotalReplicaNum; j++){ if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) { *(ths->logReplMgrs[i]) = oldLogReplMgrs[j]; + ths->logReplMgrs[i]->peerId = i; } } } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 4b57fbde2c..0dcc3eee29 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -246,7 +246,8 @@ int32_t syncReadCfgFile(SSyncNode *pNode) { } code = 0; - sInfo("vgId:%d, succceed to read sync cfg file %s", pNode->vgId, file); + sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d", + pNode->vgId, file, pCfg->cfg.changeVersion); _OVER: if (pData != NULL) taosMemoryFree(pData);