compatible old sync config

This commit is contained in:
dmchen 2023-07-21 10:31:53 +08:00
parent 154bc177a7
commit 7d480cf3ac
14 changed files with 162 additions and 43 deletions

View File

@ -1410,7 +1410,8 @@ typedef struct {
int32_t dstVgId;
uint32_t hashBegin;
uint32_t hashEnd;
int64_t reserved;
int32_t changeVersion;
int32_t reserved;
} SAlterVnodeHashRangeReq;
int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq);

View File

@ -245,7 +245,7 @@ typedef struct SSyncState {
int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo, bool isFirst);
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);

View File

@ -4690,7 +4690,8 @@ int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnode
if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1;
if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1;
if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1;
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1;
tEndEncode(&encoder);
@ -4708,7 +4709,8 @@ int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVno
if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -277,7 +277,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER;
}
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno;
@ -367,8 +367,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
int32_t vgId = req.vgId;
dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d", vgId, req.replica, req.selfIndex,
req.strict);
dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d",
vgId, req.replica, req.selfIndex, req.strict, req.changeVersion);
for (int32_t i = 0; i < req.replica; ++i) {
SReplica *pReplica = &req.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
@ -425,7 +425,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
@ -572,7 +572,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
@ -612,9 +612,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vgId = alterReq.vgId;
dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
"learnerSelfIndex:%d strict:%d changeVersion:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
for (int32_t i = 0; i < alterReq.replica; ++i) {
SReplica *pReplica = &alterReq.replicas[i];
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
@ -676,7 +676,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, true);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;

View File

@ -266,7 +266,7 @@ static void *vmOpenVnodeInThread(void *param) {
int32_t diskPrimary = pCfg->diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb, false);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());

View File

@ -634,10 +634,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;

View File

@ -274,6 +274,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.hashPrefix = pDb->cfg.hashPrefix;
createReq.hashSuffix = pDb->cfg.hashSuffix;
createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
createReq.changeVersion= ++(pVgroup->syncConfChangeVer);
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = NULL;
@ -322,9 +323,10 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.changeVersion = pVgroup->syncConfChangeVer;
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
"changeVersion:%d",
createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica,
createReq.learnerReplica, createReq.strict);
createReq.learnerReplica, createReq.strict, createReq.changeVersion);
for (int32_t i = 0; i < createReq.replica; ++i) {
mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
}
@ -402,7 +404,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
.learnerReplica = 0,
.selfIndex = -1,
.learnerSelfIndex = -1,
.changeVersion = pVgroup->syncConfChangeVer,
.changeVersion = ++(pVgroup->syncConfChangeVer),
};
for (int32_t v = 0; v < pVgroup->replica; ++v) {
@ -438,9 +440,10 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
}
}
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
"changeVersion:%d",
alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
for (int32_t i = 0; i < alterReq.replica; ++i) {
mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
}
@ -471,6 +474,83 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
return pReq;
}
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
int32_t *pContLen) {
SCheckLearnCatchupReq req = {
.vgId = pVgroup->vgId,
.strict = pDb->cfg.strict,
.replica = 0,
.learnerReplica = 0,
.selfIndex = -1,
.learnerSelfIndex = -1,
};
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = NULL;
if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){
pReplica = &req.replicas[req.replica];
req.replica++;
}
else{
pReplica = &req.learnerReplicas[req.learnerReplica];
req.learnerReplica++;
}
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) return NULL;
pReplica->id = pVgidDnode->id;
pReplica->port = pVgidDnode->port;
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
mndReleaseDnode(pMnode, pVgidDnode);
if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){
if (dnodeId == pVgid->dnodeId) {
req.selfIndex = v;
}
}
else{
if (dnodeId == pVgid->dnodeId) {
req.learnerSelfIndex = v;
}
}
}
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
req.vgId, req.replica, req.selfIndex, req.learnerReplica,
req.learnerSelfIndex, req.strict);
for (int32_t i = 0; i < req.replica; ++i) {
mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
}
for (int32_t i = 0; i < req.learnerReplica; ++i) {
mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i,
req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
}
if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
terrno = TSDB_CODE_APP_ERROR;
return NULL;
}
int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
void *pReq = taosMemoryMalloc(contLen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req);
*pContLen = contLen;
return pReq;
}
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
SDisableVnodeWriteReq disableReq = {
.vgId = vgId,
@ -501,6 +581,7 @@ static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVg
.dstVgId = pVgroup->vgId,
.hashBegin = pVgroup->hashBegin,
.hashEnd = pVgroup->hashEnd,
.changeVersion = ++(pVgroup->syncConfChangeVer),
};
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
@ -1411,7 +1492,7 @@ int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
void *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
if (pReq == NULL) return -1;
action.pCont = pReq;
@ -2358,7 +2439,7 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO
mndTransSetSerial(pTrans);
mInfo("trans:%d, vgid:%d alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d",
mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d",
pTrans->id, pVgroup->vgId, pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
@ -2374,11 +2455,15 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
//if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d",
pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d",
pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d",
pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
//check learner
@ -2392,7 +2477,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
@ -2401,7 +2485,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
@ -2421,7 +2504,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO
SVnodeGid del1 = {0};
if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1) != 0) return -1;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
@ -2439,7 +2521,6 @@ int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pO
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2) != 0) return -1;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;

View File

@ -58,7 +58,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool isFirst);
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);

View File

@ -109,7 +109,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path, bool isFirst);
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion);
int32_t vnodeSyncStart(SVnode* pVnode);
void vnodeSyncPreClose(SVnode* pVnode);
void vnodeSyncPostClose(SVnode* pVnode);

View File

@ -117,9 +117,10 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
if(pReq->learnerSelfIndex != -1){
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
}
pCfg->changeVersion = pReq->changeVersion;
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d",
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex);
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d",
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
info.config.syncCfg = *pCfg;
ret = vnodeSaveInfo(dir, &info);
@ -216,6 +217,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
info.config.hashEnd = pReq->hashEnd;
info.config.hashChange = true;
info.config.walCfg.vgId = pReq->dstVgId;
info.config.syncCfg.changeVersion = pReq->changeVersion;
SSyncCfg *pCfg = &info.config.syncCfg;
pCfg->myIndex = 0;
@ -311,7 +313,7 @@ static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
return 0;
}
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb, bool isFirst) {
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb) {
SVnode *pVnode = NULL;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
@ -439,7 +441,8 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
}
// open sync
if (vnodeSyncOpen(pVnode, dir, isFirst)) {
vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion);
if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}

View File

@ -637,7 +637,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
return pFsm;
}
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, bool isFirst) {
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
.batchSize = 1,
@ -664,7 +664,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path, bool isFirst) {
pNode->nodeId, pNode->clusterId);
}
pVnode->sync = syncOpen(&syncInfo, isFirst);
pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
return -1;

View File

@ -228,7 +228,7 @@ typedef struct SSyncNode {
} SSyncNode;
// open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst);
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
int32_t syncNodeStart(SSyncNode* pSyncNode);
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);

View File

@ -59,8 +59,8 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
int64_t syncOpen(SSyncInfo* pSyncInfo, bool isFirst) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, isFirst);
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
if (pSyncNode == NULL) {
sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
return -1;
@ -778,7 +778,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
}
// open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) {
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
if (pSyncNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -798,7 +798,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) {
TD_DIRSEP);
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
if (!taosCheckExistFile(pSyncNode->configPath) && isFirst) {
if (!taosCheckExistFile(pSyncNode->configPath)) {
// create a new raft config file
sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
@ -820,7 +820,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) {
goto _error;
}
if(isFirst){
if(vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion){
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
@ -833,6 +833,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) {
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
}
}
else{
sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d",
pSyncNode->vgId, vnodeVersion, pSyncInfo->syncCfg.changeVersion);
}
}
@ -851,7 +855,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) {
pNode->nodeId, pNode->clusterId);
}
if(isFirst){
if(vnodeVersion > pSyncInfo->syncCfg.changeVersion){
if (updated) {
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
if (syncWriteCfgFile(pSyncNode) != 0) {
@ -2382,12 +2386,38 @@ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){
ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
}
for (int32_t i = 0; i < ths->peersNum; ++i){
char buf[256];
int32_t len = 256;
int32_t n = 0;
n += snprintf(buf + n, len - n, "%s", "{");
for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
n += snprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
(i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
}
n += snprintf(buf + n, len - n, "%s", "}");
sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d",
ths->vgId, str, i, buf, ths->peersEpset->inUse);
}
for (int32_t i = 0; i < ths->peersNum; ++i){
sInfo("vgId:%d, %s, peersId%d, addr:%"PRId64,
ths->vgId, str, i, ths->peersId[i].addr);
}
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){
sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
ths->vgId, str, i, ths->raftCfg.cfg.nodeInfo[i].clusterId,
ths->raftCfg.cfg.nodeInfo[i].nodeId, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn,
ths->raftCfg.cfg.nodeInfo[i].nodePort, ths->raftCfg.cfg.nodeInfo[i].nodeRole);
}
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){
sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64,
ths->vgId, str, i, ths->replicasId[i].addr);
}
}
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
@ -2520,6 +2550,7 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
for(int j = 0; j < oldtotalReplicaNum; j++){
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
*(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
ths->logReplMgrs[i]->peerId = i;
}
}
}

View File

@ -246,7 +246,8 @@ int32_t syncReadCfgFile(SSyncNode *pNode) {
}
code = 0;
sInfo("vgId:%d, succceed to read sync cfg file %s", pNode->vgId, file);
sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d",
pNode->vgId, file, pCfg->cfg.changeVersion);
_OVER:
if (pData != NULL) taosMemoryFree(pData);