diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ba9db9d88f..c6bb599a7d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -77,7 +77,8 @@ static inline bool tmsgIsValid(tmsg_t type) { } static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || - (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT); + (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT) || + (type == TDMT_SYNC_CONFIG_CHANGE); } static inline bool syncUtilUserCommit(tmsg_t msgType) { @@ -1400,6 +1401,7 @@ typedef struct { int64_t numOfBatchInsertReqs; int64_t numOfBatchInsertSuccessReqs; int32_t numOfCachedTables; + int32_t learnerProgress; // use one reservered } SVnodeLoad; typedef struct { @@ -1541,6 +1543,7 @@ typedef struct { int8_t learnerReplica; int8_t learnerSelfIndex; SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA]; + int32_t changeVersion; } SCreateVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); @@ -1615,7 +1618,8 @@ typedef struct { int8_t learnerSelfIndex; int8_t learnerReplica; SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA]; -} SAlterVnodeReplicaReq, SAlterVnodeTypeReq; + int32_t changeVersion; +} SAlterVnodeReplicaReq, SAlterVnodeTypeReq, SCheckLearnCatchupReq; int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq); int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq); @@ -1633,7 +1637,8 @@ typedef struct { int32_t dstVgId; uint32_t hashBegin; uint32_t hashEnd; - int64_t reserved; + int32_t changeVersion; + int32_t reserved; } SAlterVnodeHashRangeReq; int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 60172bce3d..483122b070 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -85,6 +85,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE_TYPE, "dnode-alter-mnode-type", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 8844145652..f69afbd71b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -101,6 +101,7 @@ typedef struct SSyncCfg { int32_t myIndex; SNodeInfo nodeInfo[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; SyncIndex lastIndex; + int32_t changeVersion; } SSyncCfg; typedef struct SFsmCbMeta { @@ -239,20 +240,22 @@ typedef struct SSyncState { ESyncState state; bool restored; bool canRead; + int32_t progress; SyncTerm term; int64_t roleTimeMs; int64_t startTimeMs; } SSyncState; -int32_t syncInit(); -void syncCleanUp(); -int64_t syncOpen(SSyncInfo* pSyncInfo); -int32_t syncStart(int64_t rid); -void syncStop(int64_t rid); -void syncPreStop(int64_t rid); -void syncPostStop(int64_t rid); -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); -int32_t syncIsCatchUp(int64_t rid); +int32_t syncInit(); +void syncCleanUp(); +int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); +int32_t syncStart(int64_t rid); +void syncStop(int64_t rid); +void syncPreStop(int64_t rid); +void syncPostStop(int64_t rid); +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); +int32_t syncCheckMember(int64_t rid); +int32_t syncIsCatchUp(int64_t rid); ESyncRole syncGetRole(int64_t rid); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); @@ -270,6 +273,8 @@ SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); const char* syncStr(ESyncState state); +int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1bf2fb78a7..de76881467 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1082,7 +1082,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1; if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1; if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1; - if (tEncodeI32(&encoder, reserved) < 0) return -1; + if (tEncodeI32(&encoder, pload->learnerProgress) < 0) return -1; if (tEncodeI64(&encoder, pload->roleTimeMs) < 0) return -1; if (tEncodeI64(&encoder, pload->startTimeMs) < 0) return -1; } @@ -1163,7 +1163,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { for (int32_t i = 0; i < vlen; ++i) { SVnodeLoad vload = {0}; vload.syncTerm = -1; - int32_t reserved32 = 0; + if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1; if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1; if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1; @@ -1175,7 +1175,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1; if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1; if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1; - if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1; + if (tDecodeI32(&decoder, &vload.learnerProgress) < 0) return -1; if (tDecodeI64(&decoder, &vload.roleTimeMs) < 0) return -1; if (tDecodeI64(&decoder, &vload.startTimeMs) < 0) return -1; if (taosArrayPush(pReq->pVloads, &vload) == NULL) { @@ -4348,6 +4348,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR SReplica *pReplica = &pReq->learnerReplicas[i]; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; } + if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1; tEndEncode(&encoder); @@ -4434,6 +4435,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; } } + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4666,6 +4670,7 @@ int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeRe SReplica *pReplica = &pReq->learnerReplicas[i]; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; } + if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -4697,6 +4702,9 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; } } + if (!tDecodeIsEnd(&decoder)){ + if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4740,7 +4748,8 @@ int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnode if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1; if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1; if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1; - if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1; + if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1; + if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1; tEndEncode(&encoder); @@ -4758,7 +4767,8 @@ int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVno if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1; if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1; if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d975eb1cd1..302d4dafd1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -106,6 +106,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 5d08320fab..cddf132bce 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -97,6 +97,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 444639315a..e50a75d33a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -133,6 +133,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->standby = 0; pCfg->syncCfg.replicaNum = 0; pCfg->syncCfg.totalReplicaNum = 0; + pCfg->syncCfg.changeVersion = pCreate->changeVersion; memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo)); for (int32_t i = 0; i < pCreate->replica; ++i) { @@ -211,14 +212,15 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d" ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64 ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d " - "learnerReplica:%d learnerSelfIndex:%d strict:%d", + "learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d", req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression, req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize, req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, - req.learnerSelfIndex, req.strict); + req.learnerSelfIndex, req.strict, req.changeVersion); + for (int32_t i = 0; i < req.replica; ++i) { dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port, req.replicas[i].id); @@ -323,6 +325,7 @@ _OVER: return code; } +//alter replica doesn't use this, but restore dnode still use this int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SAlterVnodeTypeReq req = {0}; if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) { @@ -363,8 +366,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name); int32_t vgId = req.vgId; - dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d", vgId, req.replica, req.selfIndex, - req.strict); + dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d", + vgId, req.replica, req.selfIndex, req.strict, req.changeVersion); for (int32_t i = 0; i < req.replica; ++i) { SReplica *pReplica = &req.replicas[i]; dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id); @@ -424,7 +427,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); return -1; } - + if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) { dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr()); return -1; @@ -440,6 +443,53 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } +int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SCheckLearnCatchupReq req = {0}; + if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if(req.learnerReplicas == 0){ + req.learnerSelfIndex = -1; + } + + dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request", + req.vgId, TMSG_INFO(pMsg->msgType)); + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); + if (pVnode == NULL) { + dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr()); + terrno = TSDB_CODE_VND_NOT_EXIST; + return -1; + } + + ESyncRole role = vnodeGetRole(pVnode->pImpl); + dInfo("vgId:%d, checking node role:%d", req.vgId, role); + if(role == TAOS_SYNC_ROLE_VOTER){ + dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role); + terrno = TSDB_CODE_VND_ALREADY_IS_VOTER; + vmReleaseVnode(pMgmt, pVnode); + return -1; + } + + dInfo("vgId:%d, checking node catch up", req.vgId); + if(vnodeIsCatchUp(pVnode->pImpl) != 1){ + terrno = TSDB_CODE_VND_NOT_CATCH_UP; + vmReleaseVnode(pMgmt, pVnode); + return -1; + } + + dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name); + + vmReleaseVnode(pMgmt, pVnode); + + dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request", + req.vgId, TMSG_INFO(pMsg->msgType)); + + return 0; +} + int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SDisableVnodeWriteReq req = {0}; if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) { @@ -520,6 +570,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, open vnode", dstVgId); SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); + if (pImpl == NULL) { dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr()); return -1; @@ -559,9 +610,10 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vgId = alterReq.vgId; dInfo( "vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d " - "learnerSelfIndex:%d strict:%d", + "learnerSelfIndex:%d strict:%d changeVersion:%d", vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, - alterReq.learnerSelfIndex, alterReq.strict); + alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion); + for (int32_t i = 0; i < alterReq.replica; ++i) { SReplica *pReplica = &alterReq.replicas[i]; dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); @@ -755,6 +807,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index f4ce46411a..f0ab703b8a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -267,6 +267,7 @@ static void *vmOpenVnodeInThread(void *param) { snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb); + if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr()); pThread->failed++; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index cae2a7d2be..696107ca90 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -53,6 +53,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_DND_ALTER_VNODE_TYPE: code = vmProcessAlterVnodeTypeReq(pMgmt, pMsg); break; + case TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP: + code = vmProcessCheckLearnCatchupReq(pMgmt, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dGError("msg:%p, not processed in vnode-mgmt queue", pMsg); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5364267567..89c58337ad 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -353,6 +353,7 @@ typedef struct { int64_t roleTimeMs; int64_t startTimeMs; ESyncRole nodeRole; + int32_t learnerProgress; } SVnodeGid; typedef struct { @@ -376,6 +377,7 @@ typedef struct { SVnodeGid vnodeGid[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; void* pTsma; int32_t numOfCachedTables; + int32_t syncConfChangeVer; } SVgObj; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 4dbd2fe7f8..b7f6769d12 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -47,6 +47,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb SArray *pArray); int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs, STimeWindow tw); +int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup, + SArray *pArray); void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 20b342f9e3..b0e3dc4331 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -659,10 +659,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mndTransSetOper(pTrans, MND_OPER_CREATE_DB); if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER; + if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; - if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -882,7 +882,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * if (pIter == NULL) break; if (mndVgroupInDb(pVgroup, pNewDb->uid)) { - if (mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) { + if (mndBuildRaftAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); taosArrayDestroy(pArray); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3d8fd6220f..a601d3c121 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -475,7 +475,7 @@ int32_t mndInitSync(SMnode *pMnode) { } tsem_init(&pMgmt->syncSem, 0, 0); - pMgmt->sync = syncOpen(&syncInfo); + pMgmt->sync = syncOpen(&syncInfo, true); if (pMgmt->sync <= 0) { mError("failed to open sync since %s", terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 02d9368595..fddde70834 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1124,7 +1124,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg); if (code == 0) { pAction->msgSent = 1; - pAction->msgReceived = 0; + //pAction->msgReceived = 0; pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS; mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e0156db67c..26dc2a3f87 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -67,6 +67,8 @@ int32_t mndInitVgroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg); @@ -105,6 +107,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER) } + SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER) SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -129,7 +132,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != VGROUP_VER_NUMBER) { + if (sver < 1 || sver > VGROUP_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -158,6 +161,10 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { pVgid->syncState = TAOS_SYNC_STATE_LEADER; } } + if(dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen){ + SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER) + } + SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) terrno = 0; @@ -220,6 +227,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { pNew->pointsWritten = pOld->pointsWritten; pNew->compact = pOld->compact; memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid)); + pOld->syncConfChangeVer = pNew->syncConfChangeVer; return 0; } @@ -279,6 +287,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.hashPrefix = pDb->cfg.hashPrefix; createReq.hashSuffix = pDb->cfg.hashSuffix; createReq.tsdbPageSize = pDb->cfg.tsdbPageSize; + createReq.changeVersion= ++(pVgroup->syncConfChangeVer); for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = NULL; @@ -325,9 +334,12 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg return NULL; } - mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + createReq.changeVersion = pVgroup->syncConfChangeVer; + + mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d " + "changeVersion:%d", createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, - createReq.learnerReplica, createReq.strict); + createReq.learnerReplica, createReq.strict, createReq.changeVersion); for (int32_t i = 0; i < createReq.replica; ++i) { mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port); } @@ -405,6 +417,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p .learnerReplica = 0, .selfIndex = -1, .learnerSelfIndex = -1, + .changeVersion = ++(pVgroup->syncConfChangeVer), }; for (int32_t v = 0; v < pVgroup->replica; ++v) { @@ -440,9 +453,10 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p } } - mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d " + "changeVersion:%d", alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, - alterReq.learnerSelfIndex, alterReq.strict); + alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion); for (int32_t i = 0; i < alterReq.replica; ++i) { mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port); } @@ -473,6 +487,83 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p return pReq; } +static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId, + int32_t *pContLen) { + SCheckLearnCatchupReq req = { + .vgId = pVgroup->vgId, + .strict = pDb->cfg.strict, + .replica = 0, + .learnerReplica = 0, + .selfIndex = -1, + .learnerSelfIndex = -1, + }; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + SReplica *pReplica = NULL; + + if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){ + pReplica = &req.replicas[req.replica]; + req.replica++; + } + else{ + pReplica = &req.learnerReplicas[req.learnerReplica]; + req.learnerReplica++; + } + + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pVgidDnode == NULL) return NULL; + + pReplica->id = pVgidDnode->id; + pReplica->port = pVgidDnode->port; + memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); + mndReleaseDnode(pMnode, pVgidDnode); + + if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){ + if (dnodeId == pVgid->dnodeId) { + req.selfIndex = v; + } + } + else{ + if (dnodeId == pVgid->dnodeId) { + req.learnerSelfIndex = v; + } + } + } + + mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + req.vgId, req.replica, req.selfIndex, req.learnerReplica, + req.learnerSelfIndex, req.strict); + for (int32_t i = 0; i < req.replica; ++i) { + mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port); + } + for (int32_t i = 0; i < req.learnerReplica; ++i) { + mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i, + req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port); + } + + if (req.selfIndex == -1 && req.learnerSelfIndex == -1) { + terrno = TSDB_CODE_APP_ERROR; + return NULL; + } + + int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req); + *pContLen = contLen; + return pReq; +} + static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) { SDisableVnodeWriteReq disableReq = { .vgId = vgId, @@ -503,6 +594,7 @@ static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVg .dstVgId = pVgroup->vgId, .hashBegin = pVgroup->hashBegin, .hashEnd = pVgroup->hashEnd, + .changeVersion = ++(pVgroup->syncConfChangeVer), }; mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId, @@ -861,6 +953,28 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p } } snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star); + /* + mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress); + + if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) { + if(pVgroup->vnodeGid[i].learnerProgress < 0){ + snprintf(role, sizeof(role), "%s-", + syncStr(pVgroup->vnodeGid[i].syncState)); + + } + else if(pVgroup->vnodeGid[i].learnerProgress >= 100){ + snprintf(role, sizeof(role), "%s--", + syncStr(pVgroup->vnodeGid[i].syncState)); + } + else{ + snprintf(role, sizeof(role), "%s%d", + syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress); + } + } + else{ + snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star); + } + */ } else { } STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes); @@ -1155,6 +1269,51 @@ _OVER: return 0; } +static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray, + SVnodeGid *pDelVgid) { + taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); + for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { + SDnodeObj *pDnode = taosArrayGet(pArray, i); + mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes); + } + + int32_t code = -1; + for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) { + SDnodeObj *pDnode = taosArrayGet(pArray, d); + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; + if (pVgid->dnodeId == pDnode->id) { + int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup); + pDnode->memUsed -= vgMem; + mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64, + pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed); + pDnode->numOfVnodes--; + pVgroup->replica--; + *pDelVgid = *pVgid; + *pVgid = pVgroup->vnodeGid[pVgroup->replica]; + memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); + code = 0; + goto _OVER; + } + } + } + +_OVER: + if (code != 0) { + terrno = TSDB_CODE_APP_ERROR; + mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr()); + return -1; + } + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[vn]; + mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId); + } + + return 0; +} + int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) { STransAction action = {0}; @@ -1231,6 +1390,42 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD return 0; } +int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, + SVgObj *pOldVgroup, SVgObj *pNewVgroup, int32_t dnodeId) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup); + + int32_t contLen = 0; + void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen); + if (pReq == NULL) return -1; + + int32_t totallen = contLen + sizeof(SMsgHead); + + SMsgHead *pHead = taosMemoryMalloc(totallen); + if (pHead == NULL) { + taosMemoryFree(pReq); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pHead->contLen = htonl(totallen); + pHead->vgId = htonl(pNewVgroup->vgId); + + memcpy((void*)(pHead + 1), pReq, contLen); + taosMemoryFree(pReq); + + action.pCont = pHead; + action.contLen = totallen; + action.msgType = TDMT_SYNC_CONFIG_CHANGE; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pHead); + return -1; + } + + return 0; +} + static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) { STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); @@ -1310,6 +1505,32 @@ int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD return 0; } +int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) { + SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); + if (pDnode == NULL) return -1; + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP; + action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER; + action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) { SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); if (pDnode == NULL) return -1; @@ -2227,6 +2448,133 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb return 0; } +int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup, + SArray *pArray) { + SVgObj newVgroup = {0}; + memcpy(&newVgroup, pVgroup, sizeof(SVgObj)); + + if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) { + if (mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup) != 0) return -1; + if (mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray) != 0) return -1; + return 0; + } + + mndTransSetSerial(pTrans); + + mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d", + pTrans->id, pVgroup->vgId, pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica); + + if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) { + mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, + pVgroup->vnodeGid[0].dnodeId); + + //add second + if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1; + //add third + if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1; + + //add learner stage + newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER; + newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER; + if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; + mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d", + pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica); + if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1; + mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", + pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica); + if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1; + mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d", + pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica); + + + //check learner + newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER; + if (mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId) != 0) return -1; + if (mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId) != 0) return -1; + + //change raft type + newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER; + if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) + return -1; + + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; + + newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER; + if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) + return -1; + + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; + + SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + sdbFreeRaw(pVgRaw); + return -1; + } + (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + } else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) { + mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName, + pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId); + + SVnodeGid del1 = {0}; + if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1) != 0) return -1; + + if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; + + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; + + if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1; + + SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + sdbFreeRaw(pVgRaw); + return -1; + } + (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + + SVnodeGid del2 = {0}; + if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2) != 0) return -1; + + if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1; + + if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; + + if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1; + + pVgRaw = mndVgroupActionEncode(&newVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + sdbFreeRaw(pVgRaw); + return -1; + } + (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + } else { + return -1; + } + + mndSortVnodeGid(&newVgroup); + + { + SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { + sdbFreeRaw(pVgRaw); + return -1; + } + (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + } + + return 0; +} + int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode) { SVgObj newVgroup = {0}; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 85ef384ea9..55b62dfe48 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -110,7 +110,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c -int32_t vnodeSyncOpen(SVnode* pVnode, char* path); +int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion); int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncPreClose(SVnode* pVnode); void vnodeSyncPostClose(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 2e161d728f..d88d8820ee 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -141,6 +141,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables) < 0) return -1; @@ -151,8 +152,9 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { SJson *nodeInfo = tjsonCreateArray(); if (nodeInfo == NULL) return -1; if (tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo) < 0) return -1; - vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d", pCfg->vgId, pCfg->syncCfg.replicaNum, - pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex); + vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", + pCfg->vgId, pCfg->syncCfg.replicaNum, + pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex, pCfg->syncCfg.changeVersion); for (int i = 0; i < pCfg->syncCfg.totalReplicaNum; ++i) { SJson *info = tjsonCreateObject(); SNodeInfo *pNode = (SNodeInfo *)&pCfg->syncCfg.nodeInfo[i]; @@ -263,6 +265,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (code < 0) return -1; tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code); if (code < 0) return -1; + tjsonGetNumberValue(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion, code); + if (code < 0) return -1; tjsonGetNumberValue(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables, code); if (code < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 33c6f9d533..136168c5cc 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -15,6 +15,7 @@ #include "vnd.h" #include "vnodeInt.h" +#include "sync.h" extern int32_t tsdbPreCommit(STsdb *pTsdb); extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo); @@ -200,8 +201,10 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { // free info binary taosMemoryFree(data); - vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname, - pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex); + vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d", + pInfo->config.vgId, fname, + pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex, + pInfo->config.syncCfg.changeVersion); return 0; @@ -285,6 +288,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { tsem_wait(&pVnode->canCommit); + if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit; + pVnode->state.commitTerm = pVnode->state.applyTerm; pInfo->info.config = pVnode->config; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c8b383a6a8..d6a8552655 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -118,9 +118,10 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t if (pReq->learnerSelfIndex != -1) { pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; } + pCfg->changeVersion = pReq->changeVersion; - vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, - pCfg->totalReplicaNum, pCfg->myIndex); + vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d", + pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion); info.config.syncCfg = *pCfg; ret = vnodeSaveInfo(dir, &info); @@ -217,6 +218,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod info.config.hashEnd = pReq->hashEnd; info.config.hashChange = true; info.config.walCfg.vgId = pReq->dstVgId; + info.config.syncCfg.changeVersion = pReq->changeVersion; SSyncCfg *pCfg = &info.config.syncCfg; pCfg->myIndex = 0; @@ -447,7 +449,8 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC } // open sync - if (vnodeSyncOpen(pVnode, dir)) { + vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion); + if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) { vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 70663a6375..95404ee9f2 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -384,6 +384,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->roleTimeMs = state.roleTimeMs; pLoad->startTimeMs = state.startTimeMs; pLoad->syncCanRead = state.canRead; + pLoad->learnerProgress = state.progress; pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 17f34b0a34..52c36fcb1b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -37,6 +37,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) { int32_t code = 0; @@ -404,10 +405,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg return -1; } - vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver); + vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64 + ", state.applyTerm:%" PRId64 ", conn.applyTerm:%" PRId64, + TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, + pVnode->state.applyTerm, pMsg->info.conn.applyTerm); ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); - ASSERT(pVnode->state.applied + 1 == ver); + ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver); atomic_store_64(&pVnode->state.applied, ver); atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm); @@ -525,6 +529,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_COMPACT: vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp); goto _exit; + case TDMT_SYNC_CONFIG_CHANGE: + vnodeProcessConfigChangeReq(pVnode, ver, pReq, len, pRsp); + break; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return -1; @@ -1842,6 +1849,17 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp); } +static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { + syncCheckMember(pVnode->sync); + + pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + pRsp->pCont = NULL; + pRsp->contLen = 0; + + return 0; +} + #ifndef TD_ENTERPRISE int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 360da41482..d140c4a122 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -637,7 +637,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { return pFsm; } -int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { +int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) { SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_WAL_FIRST, .batchSize = 1, @@ -664,7 +664,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { pNode->nodeId, pNode->clusterId); } - pVnode->sync = syncOpen(&syncInfo); + pVnode->sync = syncOpen(&syncInfo, vnodeVersion); if (pVnode->sync <= 0) { vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); return -1; diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index a4e83dc495..3c372a3b12 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -40,6 +40,7 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId); +void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr * pOldIndex, SRaftId *oldReplicasId); void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime); int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index f74e43f47f..870cdd6a72 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -228,7 +228,7 @@ typedef struct SSyncNode { } SSyncNode; // open/close -------------- -SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo); +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); int32_t syncNodeStart(SSyncNode* pSyncNode); int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); @@ -238,6 +238,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int int32_t syncNodeRestore(SSyncNode* pSyncNode); void syncHbTimerDataFree(SSyncHbTimerData* pData); +// config +int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str); + // on message --------------------- int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex); diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index 65e2cc22a0..ddc036b606 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -105,7 +105,7 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str); int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode); @@ -115,7 +115,7 @@ int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, - int32_t applyCode); + int32_t applyCode, bool force); #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index f9447e0168..b13c69e471 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -46,7 +46,7 @@ void syncEntryDestroy(SSyncRaftEntry* pEntry); void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7 static FORCE_INLINE bool syncLogReplBarrier(SSyncRaftEntry* pEntry) { - return pEntry->originalRpcType == TDMT_SYNC_NOOP; + return pEntry->originalRpcType == TDMT_SYNC_NOOP || pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE; } #ifdef __cplusplus diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 6f065f56e8..925988f43a 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -151,8 +151,9 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } sTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 - ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", - pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); + ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64, + pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, + pEntry->term); // accept if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { @@ -162,7 +163,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { _SEND_RESPONSE: pEntry = NULL; - pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm); + pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm, "OnAppn"); bool matched = (pReply->matchIndex >= pReply->lastSendIndex); if (accepted && matched) { pReply->success = true; diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index e3c3f63a4f..add2f1a5dd 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -70,6 +70,27 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, Sync DID(pRaftId), CID(pRaftId)); } +void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr * pOldIndex, SRaftId *oldReplicasId){ + for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){ + sDebug("old Index j:%d, index:%"PRId64, j, pOldIndex->index[j]); + } + + for (int i = 0; i < pNewIndex->totalReplicaNum; ++i) { + for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){ + if (syncUtilSameId(/*(const SRaftId*)*/&((oldReplicasId[j])), &((*(pNewIndex->replicas))[i]))) { + pNewIndex->index[i] = pOldIndex->index[j]; + pNewIndex->privateTerm[i] = pOldIndex->privateTerm[j]; + pNewIndex->startTimeArr[i] = pOldIndex->startTimeArr[j]; + pNewIndex->recvTimeArr[i] = pOldIndex->recvTimeArr[j]; + } + } + } + + for (int i = 0; i < pNewIndex->totalReplicaNum; ++i){ + sDebug("new index i:%d, index:%"PRId64, i, pNewIndex->index[i]); + } +} + SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) { for (int i = 0; i < pNode->totalReplicaNum; i++) { if (syncUtilSameId(&pNode->replicasId[i], pRaftId)) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 9108090b87..1ff266ad8f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -59,8 +59,8 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); -int64_t syncOpen(SSyncInfo* pSyncInfo) { - SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); +int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { + SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion); if (pSyncNode == NULL) { sError("vgId:%d, failed to open sync node", pSyncInfo->vgId); return -1; @@ -106,6 +106,21 @@ _err: return -1; } +int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg){ + SSyncNode* pSyncNode = syncNodeAcquire(rid); + + if (pSyncNode == NULL) { + sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid); + return -1; + } + + *cfg = pSyncNode->raftCfg.cfg; + + syncNodeRelease(pSyncNode); + + return 0; +} + void syncStop(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode != NULL) { @@ -516,6 +531,20 @@ SSyncState syncGetState(int64_t rid) { } else { state.canRead = state.restored; } + /* + double progress = 0; + if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){ + progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex; + state.progress = (int32_t)(progress * 100); + } + else{ + state.progress = -1; + } + sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", " + "progress:%lf, progress:%d", + pSyncNode->vgId, + pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress); + */ state.term = raftStoreGetTerm(pSyncNode); syncNodeRelease(pSyncNode); } @@ -573,6 +602,20 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { return ret; } +int32_t syncCheckMember(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + sError("sync propose error"); + return -1; + } + + if(pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){ + return -1; + } + + return 0; +} + int32_t syncIsCatchUp(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -634,15 +677,26 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ } // optimized one replica - if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { + if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) { SyncIndex retIndex; int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex); - if (code == 0) { + if (code >= 0) { pMsg->info.conn.applyIndex = retIndex; pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode); - sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, - TMSG_INFO(pMsg->msgType)); - return 1; + + //after raft member change, need to handle 1->2 switching point + //at this point, need to switch entry handling thread + if(pSyncNode->replicaNum == 1){ + sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, + TMSG_INFO(pMsg->msgType)); + return 1; + } + else{ + sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64 " type:%s, " + "handle:%p", pSyncNode->vgId, retIndex, + TMSG_INFO(pMsg->msgType), pMsg->info.handle); + return 0; + } } else { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, @@ -742,7 +796,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { } // open/close -------------- -SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { +SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -784,20 +838,27 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { goto _error; } - if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) { - sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId); - pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg; - if (syncWriteCfgFile(pSyncNode) != 0) { - sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId); - goto _error; + if(vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion){ + if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) { + sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId); + pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg; + if (syncWriteCfgFile(pSyncNode) != 0) { + sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId); + goto _error; + } + } else { + sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId); + pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg; } - } else { - sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId); - pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg; + } + else{ + sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", + pSyncNode->vgId, vnodeVersion, pSyncInfo->syncCfg.changeVersion); } } - // init by SSyncInfo + + // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg; bool updated = false; @@ -812,14 +873,16 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { pNode->nodeId, pNode->clusterId); } - if (updated) { - sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId); - if (syncWriteCfgFile(pSyncNode) != 0) { - sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId); - goto _error; + if(vnodeVersion > pSyncInfo->syncCfg.changeVersion){ + if (updated) { + sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId); + if (syncWriteCfgFile(pSyncNode) != 0) { + sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId); + goto _error; + } } } - + pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg; @@ -2256,6 +2319,540 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand return code; } +void syncBuildConfigFromReq(SAlterVnodeReplicaReq *pReq, SSyncCfg *cfg){//TODO SAlterVnodeReplicaReq name is proper? + cfg->replicaNum = 0; + cfg->totalReplicaNum = 0; + + for (int i = 0; i < pReq->replica; ++i) { + SNodeInfo *pNode = &cfg->nodeInfo[i]; + pNode->nodeId = pReq->replicas[i].id; + pNode->nodePort = pReq->replicas[i].port; + tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + pNode->nodeRole = TAOS_SYNC_ROLE_VOTER; + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole); + cfg->replicaNum++; + } + if(pReq->selfIndex != -1){ + cfg->myIndex = pReq->selfIndex; + } + for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) { + SNodeInfo *pNode = &cfg->nodeInfo[i]; + pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id; + pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port; + pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER; + tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn)); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole); + cfg->totalReplicaNum++; + } + cfg->totalReplicaNum += pReq->replica; + if(pReq->learnerSelfIndex != -1){ + cfg->myIndex = pReq->replica + pReq->learnerSelfIndex; + } + cfg->changeVersion = pReq->changeVersion; +} + +int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){ + if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){ + return -1; + } + + SMsgHead *head = (SMsgHead *)pEntry->data; + void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead)); + + SAlterVnodeTypeReq req = {0}; + if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + SSyncCfg cfg = {0}; + syncBuildConfigFromReq(&req, &cfg); + + if(cfg.totalReplicaNum >= 1 && ths->state == TAOS_SYNC_STATE_LEADER){ + bool incfg = false; + for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ + if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 + && ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ + incfg = true; + break; + } + } + + if(!incfg){ + SyncTerm currentTerm = raftStoreGetTerm(ths); + syncNodeStepDown(ths, currentTerm); + return 1; + } + } + return 0; +} + +void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){ + sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d, " + "restoreFinish:%d", + ths->vgId, str, + ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion, + ths->restoreFinish); + + sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", + ths->vgId, str, ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, + ths->myNodeInfo.nodePort, ths->myNodeInfo.nodeRole); + + for (int32_t i = 0; i < ths->peersNum; ++i){ + sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", + ths->vgId, str, i, ths->peersNodeInfo[i].clusterId, + ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn, + ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole); + } + + for (int32_t i = 0; i < ths->peersNum; ++i){ + char buf[256]; + int32_t len = 256; + int32_t n = 0; + n += snprintf(buf + n, len - n, "%s", "{"); + for (int i = 0; i < ths->peersEpset->numOfEps; i++) { + n += snprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port, + (i + 1 < ths->peersEpset->numOfEps ? ", " : "")); + } + n += snprintf(buf + n, len - n, "%s", "}"); + + sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", + ths->vgId, str, i, buf, ths->peersEpset->inUse); + } + + for (int32_t i = 0; i < ths->peersNum; ++i){ + sInfo("vgId:%d, %s, peersId%d, addr:%"PRId64, + ths->vgId, str, i, ths->peersId[i].addr); + } + + for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){ + sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", + ths->vgId, str, i, ths->raftCfg.cfg.nodeInfo[i].clusterId, + ths->raftCfg.cfg.nodeInfo[i].nodeId, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, + ths->raftCfg.cfg.nodeInfo[i].nodePort, ths->raftCfg.cfg.nodeInfo[i].nodeRole); + } + + for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){ + sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, + ths->vgId, str, i, ths->replicasId[i].addr); + } + +} + +int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){ + int32_t i = 0; + + //change peersNodeInfo + i = 0; + for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ + if(!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 + && ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){ + ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole; + ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId; + tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN); + ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId; + ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort; + + syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]); + + if (!syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i])) { + sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i); + return -1; + } + + i++; + } + } + ths->peersNum = i; + + //change cfg nodeInfo + ths->raftCfg.cfg.replicaNum = 0; + i = 0; + for(int32_t j = 0; j < cfg->totalReplicaNum; ++j) { + if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ + ths->raftCfg.cfg.replicaNum++; + } + ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole; + ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId; + tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN); + ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId; + ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort; + if((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 + && ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){ + ths->raftCfg.cfg.myIndex = i; + } + i++; + } + ths->raftCfg.cfg.totalReplicaNum = i; + + return 0; +} + +void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg *cfg){ + //change peersNodeInfo + for (int32_t i = 0; i < ths->peersNum; ++i) { + for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ + if(strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 + && ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){ + if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ + ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER; + } + } + } + } + + //change cfg nodeInfo + ths->raftCfg.cfg.replicaNum = 0; + for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { + for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){ + if(strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 + && ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){ + if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ + ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER; + ths->raftCfg.cfg.replicaNum++; + } + } + } + } +} + +int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum){ + //1.rebuild replicasId, remove deleted one + SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; + memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId)); + + ths->replicaNum = ths->raftCfg.cfg.replicaNum; + ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum; + for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { + syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]); + } + + + //2.rebuild MatchIndex, remove deleted one + SSyncIndexMgr *oldIndex = ths->pMatchIndex; + + ths->pMatchIndex = syncIndexMgrCreate(ths); + + syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId); + + syncIndexMgrDestroy(oldIndex); + + + //3.rebuild NextIndex, remove deleted one + SSyncIndexMgr *oldNextIndex = ths->pNextIndex; + + ths->pNextIndex = syncIndexMgrCreate(ths); + + syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId); + + syncIndexMgrDestroy(oldNextIndex); + + + //4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild + voteGrantedUpdate(ths->pVotesGranted, ths); + votesRespondUpdate(ths->pVotesRespond, ths); + + + //5.rebuild logReplMgr + for(int i = 0; i < oldtotalReplicaNum; ++i){ + sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId, i, + ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex, + ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex); + } + + SSyncLogReplMgr oldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; + + for(int i = 0; i < oldtotalReplicaNum; i++){ + oldLogReplMgrs[i] = *(ths->logReplMgrs[i]); + } + + syncNodeLogReplDestroy(ths); + syncNodeLogReplInit(ths); + + for(int i = 0; i < ths->totalReplicaNum; ++i){ + for(int j = 0; j < oldtotalReplicaNum; j++){ + if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) { + *(ths->logReplMgrs[i]) = oldLogReplMgrs[j]; + ths->logReplMgrs[i]->peerId = i; + } + } + } + + for(int i = 0; i < ths->totalReplicaNum; ++i){ + sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")" , ths->vgId, i, + ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex, + ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex); + } + + //6.rebuild sender + for(int i = 0; i < oldtotalReplicaNum; ++i){ + sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, + ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime) + } + + for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { + if (ths->senders[i] != NULL) { + sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]); + + if (snapshotSenderIsStart(ths->senders[i])) { + snapshotSenderStop(ths->senders[i], false); + } + + snapshotSenderDestroy(ths->senders[i]); + ths->senders[i] = NULL; + } + } + + for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { + SSyncSnapshotSender* pSender = snapshotSenderCreate(ths, i); + if (pSender == NULL) return -1; + + ths->senders[i] = pSender; + sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender); + } + + for(int i = 0; i < ths->totalReplicaNum; i++){ + sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, + ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime) + } + + + //7.rebuild synctimer + syncNodeStopHeartbeatTimer(ths); + + for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { + syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i]); + } + + syncNodeStartHeartbeatTimer(ths); + + + //8.rebuild peerStates + SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; + for(int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++){ + oldState[i] = ths->peerStates[i]; + } + + for(int i = 0; i < ths->totalReplicaNum; i++){ + for(int j = 0; j < oldtotalReplicaNum; j++){ + if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){ + ths->peerStates[i] = oldState[j]; + } + } + } + + return 0; +} + +void syncNodeChangeToVoter(SSyncNode* ths){ + //replicasId, only need to change replicaNum when 1->3 + ths->replicaNum = ths->raftCfg.cfg.replicaNum; + sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum); + for (int32_t i = 0; i < ths->totalReplicaNum; ++i){ + sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr); + } + + //pMatchIndex, pNextIndex, only need to change replicaNum when 1->3 + ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum; + ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum; + + sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum); + for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i){ + sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]); + } + + //pVotesGranted, pVotesRespond + voteGrantedUpdate(ths->pVotesGranted, ths); + votesRespondUpdate(ths->pVotesRespond, ths); + + //logRepMgrs + //no need to change logRepMgrs when 1->3 +} + +void syncNodeResetPeerAndCfg(SSyncNode* ths){ + SNodeInfo node = {0}; + for (int32_t i = 0; i < ths->peersNum; ++i) { + memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo)); + } + + for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { + memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo)); + } +} + +int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){ + if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){ + return -1; + } + + SMsgHead *head = (SMsgHead *)pEntry->data; + void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead)); + + SAlterVnodeTypeReq req = {0}; + if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + SSyncCfg cfg = {0}; + syncBuildConfigFromReq(&req, &cfg); + + if(cfg.changeVersion <= ths->raftCfg.cfg.changeVersion){ + sInfo("vgId:%d, skip conf change entry since lower version. " + "this entry, index:%" PRId64 ", term:%" PRId64 ", totalReplicaNum:%d, changeVersion:%d; " + "current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64", changeVersion:%d", + ths->vgId, + pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, + ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion); + return 0; + } + + if(strcmp(str, "Commit") == 0){ + sInfo("vgId:%d, change config from %s. " + "this, i:%" PRId64 ", trNum:%d, vers:%d; " + "node, rNum:%d, pNum:%d, trNum:%d, " + "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " + "cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)", + ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, + ths->replicaNum, ths->peersNum, ths->totalReplicaNum, + ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, + pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); + } + else{ + sInfo("vgId:%d, change config from %s. " + "this, i:%" PRId64 ", t:%" PRId64 ", trNum:%d, vers:%d; " + "node, rNum:%d, pNum:%d, trNum:%d, " + "buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " + "cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")", + ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, + ths->replicaNum, ths->peersNum, ths->totalReplicaNum, + ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, + pEntry->index -1, ths->commitIndex, ths->pLogBuf->commitIndex); + } + + syncNodeLogConfigInfo(ths, &cfg, "before config change"); + + int32_t oldTotalReplicaNum = ths->totalReplicaNum; + + if(cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2){//remove replica + + bool incfg = false; + for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ + if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 + && ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ + incfg = true; + break; + } + } + + if(incfg){//remove other + syncNodeResetPeerAndCfg(ths); + + //no need to change myNodeInfo + + if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){ + return -1; + }; + + if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){ + return -1; + }; + } + else{//remove myself + //no need to do anything actually, to change the following to reduce distruptive server chance + + syncNodeResetPeerAndCfg(ths); + + //change myNodeInfo + ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER; + + //change peer and cfg + ths->peersNum = 0; + memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo)); + ths->raftCfg.cfg.replicaNum = 0; + ths->raftCfg.cfg.totalReplicaNum = 1; + + //change other + if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){ + return -1; + } + + //change state + ths->state = TAOS_SYNC_STATE_LEARNER; + } + + ths->restoreFinish = false; + } + else{//add replica, or change replica type + if(ths->totalReplicaNum == 3){ //change replica type + sInfo("vgId:%d, begin change replica type", ths->vgId); + + //change myNodeInfo + for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){ + if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 + && ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){ + if(cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){ + ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER; + } + } + } + + //change peer and cfg + syncNodeChangePeerAndCfgToVoter(ths, &cfg); + + //change other + syncNodeChangeToVoter(ths); + + //change state + if(ths->state ==TAOS_SYNC_STATE_LEARNER){ + if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER ){ + ths->state = TAOS_SYNC_STATE_FOLLOWER; + } + } + + ths->restoreFinish = false; + } + else{//add replica + sInfo("vgId:%d, begin add replica", ths->vgId); + + //no need to change myNodeInfo + + //change peer and cfg + if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){ + return -1; + }; + + //change other + if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){ + return -1; + }; + + //no need to change state + + if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){ + ths->restoreFinish = false; + } + } + } + + ths->quorum = syncUtilQuorum(ths->replicaNum); + + ths->raftCfg.lastConfigIndex = pEntry->index; + ths->raftCfg.cfg.lastIndex = pEntry->index; + ths->raftCfg.cfg.changeVersion = cfg.changeVersion; + + syncNodeLogConfigInfo(ths, &cfg, "after config change"); + + if(syncWriteCfgFile(ths) != 0){ + sError("vgId:%d, failed to create sync cfg file", ths->vgId); + return -1; + }; + + return 0; +} + int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { if (pEntry->dataLen < sizeof(SMsgHead)) { sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId, @@ -2268,13 +2865,13 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); ASSERT(terrno != 0); - (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno); + (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false); syncEntryDestroy(pEntry); return -1; } - + // proceed match index, with replicating on needed - SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL); + SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append"); sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", @@ -2305,6 +2902,9 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) { int32_t toCount = 0; int64_t tsNow = taosGetTimestampMs(); for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { + if(pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER){ + continue; + } int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i])); if (recvTime == 0 || recvTime == -1) { continue; @@ -2566,6 +3166,14 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index); } + //1->2, config change is add in write thread, and will continue in sync thread + //need save message for it + if(pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE){ + SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; + uint64_t seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub); + pEntry->seqNum = seqNum; + } + if (pEntry == NULL) { sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr()); return -1; @@ -2576,7 +3184,28 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn (*pRetIndex) = index; } - int32_t code = syncNodeAppend(ths, pEntry); + if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ + int32_t code = syncNodeCheckChangeConfig(ths, pEntry); + if(code < 0){ + sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr()); + syncEntryDestroy(pEntry); + pEntry = NULL; + return -1; + } + + if(code > 0){ + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; + (void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info); + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } + syncEntryDestroy(pEntry); + pEntry = NULL; + return -1; + } + } + + code = syncNodeAppend(ths, pEntry); return code; } else { syncEntryDestroy(pEntry); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 92f34db16d..532a6955cf 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -25,6 +25,8 @@ #include "syncRespMgr.h" #include "syncSnapshot.h" #include "syncUtil.h" +#include "syncRaftCfg.h" +#include "syncVoteMgr.h" static bool syncIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || @@ -428,7 +430,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf return 0; } -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) { +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str) { taosThreadMutexLock(&pBuf->mutex); syncLogBufferValidate(pBuf); @@ -475,9 +477,6 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p sTrace("vgId:%d, log buffer proceed. start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64, pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); - // replicate on demand - (void)syncNodeReplicateWithoutLock(pNode); - // persist if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) { sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), @@ -485,6 +484,39 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p taosMsleep(1); goto _out; } + + if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ + if(pNode->pLogBuf->commitIndex == pEntry->index -1){ + sInfo("vgId:%d, to change config at %s. " + "current entry, index:%" PRId64 ", term:%" PRId64", " + "node, restore:%d, commitIndex:%" PRId64 ", " + "cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")", + pNode->vgId, str, + pEntry->index, pEntry->term, + pNode->restoreFinish, pNode->commitIndex, + pEntry->index - 1, pNode->pLogBuf->commitIndex); + if(syncNodeChangeConfig(pNode, pEntry, str) != 0){ + sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, terrstr(), + pEntry->index); + goto _out; + } + } + else{ + sInfo("vgId:%d, delay change config from Node %s. " + "curent entry, index:%" PRId64 ", term:%" PRId64 ", " + "node, commitIndex:%" PRId64 ", pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), " + "cond:( pre entry index:%" PRId64" != buf commit index:%" PRId64 ")", + pNode->vgId, str, + pEntry->index, pEntry->term, + pNode->commitIndex, pNode->pLogBuf->startIndex, pNode->pLogBuf->commitIndex, + pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex, + pEntry->index - 1, pNode->pLogBuf->commitIndex); + } + } + + // replicate on demand + (void)syncNodeReplicateWithoutLock(pNode); + ASSERT(pEntry->index == pBuf->matchIndex); // update my match index @@ -503,8 +535,16 @@ _out: } int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry, - int32_t applyCode) { - if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) { + int32_t applyCode, bool force) { + //learner need to execute fsm when it catch up entry log + //if force is true, keep all contition check to execute fsm + if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1 + && pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER + && force == false) { + sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d," + "role:%d, restoreFinish:%d", + pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode, + pNode->replicaNum, pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish); return 0; } @@ -559,6 +599,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); SSyncRaftEntry* pEntry = NULL; bool inBuf = false; + SSyncRaftEntry* pNextEntry = NULL; + bool nextInBuf = false; if (commitIndex <= pBuf->commitIndex) { sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, @@ -584,7 +626,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } - if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) { + if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64, vgId, pEntry->index, pEntry->term, role, currentTerm); @@ -595,10 +637,51 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, pEntry->index, pEntry->term, role, currentTerm); + pNextEntry = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf); + if (pNextEntry != NULL) { + if(pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){ + sInfo("vgId:%d, to change config at Commit. " + "current entry, index:%" PRId64 ", term:%" PRId64", " + "node, role:%d, current term:%" PRId64 ", restore:%d, " + "cond, next entry index:%" PRId64 ", msgType:%s", + vgId, + pEntry->index, pEntry->term, + role, currentTerm, pNode->restoreFinish, + pNextEntry->index, TMSG_INFO(pNextEntry->originalRpcType)); + + if(syncNodeChangeConfig(pNode, pNextEntry, "Commit") != 0){ + sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64 + ", role:%d, current term:%" PRId64, + vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); + goto _out; + } + + //for 2->1, need to apply config change entry in sync thread, + if(pNode->replicaNum == 1){ + if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) { + sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 + ", role:%d, current term:%" PRId64, + vgId, pNextEntry->index, pNextEntry->term, role, currentTerm); + goto _out; + } + + index++; + pBuf->commitIndex = index; + + sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, + pNextEntry->index, pNextEntry->term, role, currentTerm); + } + } + if (!nextInBuf) { + syncEntryDestroy(pNextEntry); + pNextEntry = NULL; + } + } + if (!inBuf) { syncEntryDestroy(pEntry); pEntry = NULL; - } + } } // recycle @@ -626,6 +709,10 @@ _out: syncEntryDestroy(pEntry); pEntry = NULL; } + if (!nextInBuf) { + syncEntryDestroy(pNextEntry); + pNextEntry = NULL; + } syncLogBufferValidate(pBuf); taosThreadMutexUnlock(&pBuf->mutex); return ret; diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 480ed4b2af..0dcc3eee29 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -44,6 +44,7 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) { SSyncCfg *pCfg = (SSyncCfg *)pObj; if (tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1; if (tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1; + if (tjsonAddDoubleToObject(pJson, "changeVersion", pCfg->changeVersion) < 0) return -1; SJson *nodeInfo = tjsonCreateArray(); if (nodeInfo == NULL) return -1; @@ -113,8 +114,9 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) { if (taosRenameFile(file, realfile) != 0) goto _OVER; code = 0; - sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64, pNode->vgId, - realfile, len, pNode->raftCfg.lastConfigIndex); + sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", " + "changeVersion:%d", pNode->vgId, + realfile, len, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.cfg.changeVersion); _OVER: if (pJson != NULL) tjsonDelete(pJson); @@ -136,6 +138,8 @@ static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) { if (code < 0) return -1; tjsonGetInt32ValueFromDouble(pJson, "myIndex", pCfg->myIndex, code); if (code < 0) return -1; + tjsonGetInt32ValueFromDouble(pJson, "changeVersion", pCfg->changeVersion, code); + if (code < 0) return -1; SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo"); if (nodeInfo == NULL) return -1; @@ -242,7 +246,8 @@ int32_t syncReadCfgFile(SSyncNode *pNode) { } code = 0; - sInfo("vgId:%d, succceed to read sync cfg file %s", pNode->vgId, file); + sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d", + pNode->vgId, file, pCfg->cfg.changeVersion); _OVER: if (pData != NULL) taosMemoryFree(pData);