feat/TD-22970

This commit is contained in:
dmchen 2023-07-18 16:09:38 +08:00
parent 7e89622149
commit 3711fdd865
31 changed files with 1060 additions and 103 deletions

View File

@ -77,7 +77,8 @@ static inline bool tmsgIsValid(tmsg_t type) {
}
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT);
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT) ||
(type == TDMT_SYNC_CONFIG_CHANGE);
}
static inline bool syncUtilUserCommit(tmsg_t msgType) {
@ -1175,6 +1176,7 @@ typedef struct {
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
int32_t numOfCachedTables;
int32_t learnerProgress; // use one reservered
} SVnodeLoad;
typedef struct {
@ -1314,6 +1316,7 @@ typedef struct {
int8_t learnerReplica;
int8_t learnerSelfIndex;
SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA];
int32_t changeVersion;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
@ -1388,7 +1391,8 @@ typedef struct {
int8_t learnerSelfIndex;
int8_t learnerReplica;
SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA];
} SAlterVnodeReplicaReq, SAlterVnodeTypeReq;
int32_t changeVersion;
} SAlterVnodeReplicaReq, SAlterVnodeTypeReq, SCheckLearnCatchupReq;
int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);

View File

@ -85,6 +85,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE_TYPE, "dnode-alter-mnode-type", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MND_MSG)
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)

View File

@ -101,6 +101,7 @@ typedef struct SSyncCfg {
int32_t myIndex;
SNodeInfo nodeInfo[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
SyncIndex lastIndex;
int32_t changeVersion;
} SSyncCfg;
typedef struct SFsmCbMeta {
@ -239,16 +240,18 @@ typedef struct SSyncState {
ESyncState state;
bool restored;
bool canRead;
int32_t progress;
} SSyncState;
int32_t syncInit();
void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
int64_t syncOpen(SSyncInfo* pSyncInfo, bool isFirst);
int32_t syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncCheckMember(int64_t rid);
int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);

View File

@ -1078,7 +1078,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
if (tEncodeI32(&encoder, reserved) < 0) return -1;
if (tEncodeI32(&encoder, pload->learnerProgress) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
if (tEncodeI64(&encoder, reserved) < 0) return -1;
}
@ -1157,7 +1157,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
if (tDecodeI32(&decoder, (int32_t *)&reserved) < 0) return -1;
if (tDecodeI32(&decoder, &vload.learnerProgress) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
@ -4290,6 +4290,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
SReplica *pReplica = &pReq->learnerReplicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
tEndEncode(&encoder);
@ -4376,6 +4377,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
}
}
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -4608,6 +4612,7 @@ int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeRe
SReplica *pReplica = &pReq->learnerReplicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -4639,6 +4644,9 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
}
}
if (!tDecodeIsEnd(&decoder)){
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -106,6 +106,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -94,6 +94,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);

View File

@ -133,6 +133,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->standby = 0;
pCfg->syncCfg.replicaNum = 0;
pCfg->syncCfg.totalReplicaNum = 0;
pCfg->syncCfg.changeVersion = pCreate->changeVersion;
memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
for (int32_t i = 0; i < pCreate->replica; ++i) {
@ -211,14 +212,15 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
"learnerReplica:%d learnerSelfIndex:%d strict:%d",
"learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d",
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
(uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix,
req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict);
req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict,
req.changeVersion);
for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
req.replicas[i].id);
@ -272,7 +274,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
goto _OVER;
}
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb, true);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno;
@ -321,6 +323,7 @@ _OVER:
return code;
}
//alter replica doesn't use this, but restore dnode still use this
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SAlterVnodeTypeReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
@ -417,12 +420,12 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb, true);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
}
if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
return -1;
@ -438,6 +441,53 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCheckLearnCatchupReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if(req.learnerReplicas == 0){
req.learnerSelfIndex = -1;
}
dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request",
req.vgId, TMSG_INFO(pMsg->msgType));
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
ESyncRole role = vnodeGetRole(pVnode->pImpl);
dInfo("vgId:%d, checking node role:%d", req.vgId, role);
if(role == TAOS_SYNC_ROLE_VOTER){
dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}
dInfo("vgId:%d, checking node catch up", req.vgId);
if(vnodeIsCatchUp(pVnode->pImpl) != 1){
terrno = TSDB_CODE_VND_NOT_CATCH_UP;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}
dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
vmReleaseVnode(pMgmt, pVnode);
dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request",
req.vgId, TMSG_INFO(pMsg->msgType));
return 0;
}
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SDisableVnodeWriteReq req = {0};
if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
@ -515,7 +565,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb, true);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
return -1;
@ -616,7 +666,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb, true);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
@ -748,6 +798,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;

View File

@ -207,7 +207,7 @@ static void *vmOpenVnodeInThread(void *param) {
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb, false);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++;

View File

@ -52,6 +52,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_ALTER_VNODE_TYPE:
code = vmProcessAlterVnodeTypeReq(pMgmt, pMsg);
break;
case TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP:
code = vmProcessCheckLearnCatchupReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);

View File

@ -349,6 +349,7 @@ typedef struct {
bool syncRestore;
bool syncCanRead;
ESyncRole nodeRole;
int32_t learnerProgress;
} SVnodeGid;
typedef struct {
@ -372,6 +373,7 @@ typedef struct {
SVnodeGid vnodeGid[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
void* pTsma;
int32_t numOfCachedTables;
int32_t syncConfChangeVer;
} SVgObj;
typedef struct {

View File

@ -501,6 +501,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pGid->syncState = pVload->syncState;
pGid->syncRestore = pVload->syncRestore;
pGid->syncCanRead = pVload->syncCanRead;
pGid->learnerProgress = pVload->learnerProgress;
roleChanged = true;
}
break;

View File

@ -476,7 +476,7 @@ int32_t mndInitSync(SMnode *pMnode) {
}
tsem_init(&pMgmt->syncSem, 0, 0);
pMgmt->sync = syncOpen(&syncInfo);
pMgmt->sync = syncOpen(&syncInfo, true);
if (pMgmt->sync <= 0) {
mError("failed to open sync since %s", terrstr());
return -1;

View File

@ -1125,7 +1125,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
if (code == 0) {
pAction->msgSent = 1;
pAction->msgReceived = 0;
//pAction->msgReceived = 0;
pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS;
mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);

View File

@ -27,7 +27,7 @@
#include "mndUser.h"
#include "tmisce.h"
#define VGROUP_VER_NUMBER 1
#define VGROUP_VER_NUMBER 2
#define VGROUP_RESERVE_SIZE 64
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
@ -65,6 +65,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
@ -103,6 +105,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
}
SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
@ -127,7 +130,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != VGROUP_VER_NUMBER) {
if (sver < 1 || sver > VGROUP_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
@ -156,6 +159,10 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
pVgid->syncState = TAOS_SYNC_STATE_LEADER;
}
}
if(sver > 1){
SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
}
SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
terrno = 0;
@ -207,6 +214,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
pNew->pointsWritten = pOld->pointsWritten;
pNew->compact = pOld->compact;
memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
pOld->syncConfChangeVer = pNew->syncConfChangeVer;
return 0;
}
@ -312,6 +320,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return NULL;
}
createReq.changeVersion = pVgroup->syncConfChangeVer;
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica,
createReq.learnerReplica, createReq.strict);
@ -392,6 +402,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
.learnerReplica = 0,
.selfIndex = -1,
.learnerSelfIndex = -1,
.changeVersion = pVgroup->syncConfChangeVer,
};
for (int32_t v = 0; v < pVgroup->replica; ++v) {
@ -848,6 +859,28 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
}
}
snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
/*
mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
if(pVgroup->vnodeGid[i].learnerProgress < 0){
snprintf(role, sizeof(role), "%s-",
syncStr(pVgroup->vnodeGid[i].syncState));
}
else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
snprintf(role, sizeof(role), "%s--",
syncStr(pVgroup->vnodeGid[i].syncState));
}
else{
snprintf(role, sizeof(role), "%s%d",
syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
}
}
else{
snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
}
*/
} else {
}
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
@ -1132,6 +1165,51 @@ _OVER:
return 0;
}
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
SVnodeGid *pDelVgid) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i);
mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
}
int32_t code = -1;
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
SDnodeObj *pDnode = taosArrayGet(pArray, d);
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
if (pVgid->dnodeId == pDnode->id) {
int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
pDnode->memUsed -= vgMem;
mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
pDnode->numOfVnodes--;
pVgroup->replica--;
*pDelVgid = *pVgid;
*pVgid = pVgroup->vnodeGid[pVgroup->replica];
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
code = 0;
goto _OVER;
}
}
}
_OVER:
if (code != 0) {
terrno = TSDB_CODE_APP_ERROR;
mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
return -1;
}
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
}
return 0;
}
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
STransAction action = {0};
@ -1208,6 +1286,40 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0;
}
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
SVgObj *pOldVgroup, SVgObj *pNewVgroup, int32_t dnodeId) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
int32_t contLen = 0;
void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
if (pReq == NULL) return -1;
int32_t totallen = contLen + sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(totallen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pHead->contLen = htonl(totallen);
pHead->vgId = htonl(pNewVgroup->vgId);
memcpy((void*)(pHead + 1), pReq, contLen);
action.pCont = pHead;
action.contLen = totallen;
action.msgType = TDMT_SYNC_CONFIG_CHANGE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
@ -1288,7 +1400,7 @@ int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
return 0;
}
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
if (pDnode == NULL) return -1;
@ -1302,7 +1414,7 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
@ -2130,62 +2242,101 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
mndTransSetSerial(pTrans);
mInfo("trans:%d, vgid:%d alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d",
pTrans->id, pVgroup->vgId, pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
//add second
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1;
//add third
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1;
//learner stage
//add learner stage
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
//if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
//follower stage
//check learner
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
if (mndAddAlterVnodeTypeAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId) != 0)
return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
if (mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId) != 0) return -1;
if (mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId) != 0) return -1;
//change raft type
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
//add third
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1;
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId) != 0)
return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
} else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
SVnodeGid del1 = {0};
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVgroup, pArray, &del1) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId) != 0)
return -1;
if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1) != 0) return -1;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
if (mndRemoveVnodeFromVgroup(pMnode, pTrans, &newVgroup, pArray, &del2) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2) != 0) return -1;
newVgroup.syncConfChangeVer++;
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
} else {
return -1;
}

View File

@ -56,7 +56,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb, bool isFirst);
void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);

View File

@ -109,7 +109,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode);
// vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path, bool isFirst);
int32_t vnodeSyncStart(SVnode* pVnode);
void vnodeSyncPreClose(SVnode* pVnode);
void vnodeSyncPostClose(SVnode* pVnode);

View File

@ -141,6 +141,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables) < 0) return -1;
@ -151,8 +152,9 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
SJson *nodeInfo = tjsonCreateArray();
if (nodeInfo == NULL) return -1;
if (tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo) < 0) return -1;
vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d", pCfg->vgId, pCfg->syncCfg.replicaNum,
pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex);
vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d",
pCfg->vgId, pCfg->syncCfg.replicaNum,
pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex, pCfg->syncCfg.changeVersion);
for (int i = 0; i < pCfg->syncCfg.totalReplicaNum; ++i) {
SJson *info = tjsonCreateObject();
SNodeInfo *pNode = (SNodeInfo *)&pCfg->syncCfg.nodeInfo[i];
@ -263,6 +265,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables, code);
if (code < 0) return -1;

View File

@ -195,8 +195,10 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// free info binary
taosMemoryFree(data);
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex);
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d",
pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex,
pInfo->config.syncCfg.changeVersion);
return 0;

View File

@ -284,7 +284,7 @@ void vnodeDestroy(const char *path, STfs *pTfs) {
tfsRmdir(pTfs, path);
}
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb, bool isFirst) {
SVnode *pVnode = NULL;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
@ -407,7 +407,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open sync
if (vnodeSyncOpen(pVnode, dir)) {
if (vnodeSyncOpen(pVnode, dir, isFirst)) {
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}

View File

@ -381,6 +381,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->syncState = state.state;
pLoad->syncRestore = state.restored;
pLoad->syncCanRead = state.canRead;
pLoad->learnerProgress = state.progress;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);

View File

@ -36,6 +36,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
int32_t code = 0;
@ -392,10 +393,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
return -1;
}
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64
", state.applyTerm:%" PRId64 ", conn.applyTerm:%" PRId64,
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied,
pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applied + 1 == ver);
ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver);
atomic_store_64(&pVnode->state.applied, ver);
atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
@ -518,6 +522,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_COMPACT:
vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
goto _exit;
case TDMT_SYNC_CONFIG_CHANGE:
vnodeProcessConfigChangeReq(pVnode, ver, pReq, len, pRsp);
break;
default:
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
return -1;
@ -1834,6 +1841,17 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
}
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
syncCheckMember(pVnode->sync);
pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
return 0;
}
#ifndef TD_ENTERPRISE
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
return 0;

View File

@ -637,7 +637,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
return pFsm;
}
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, bool isFirst) {
SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
.batchSize = 1,
@ -664,7 +664,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
pNode->nodeId, pNode->clusterId);
}
pVnode->sync = syncOpen(&syncInfo);
pVnode->sync = syncOpen(&syncInfo, isFirst);
if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
return -1;

View File

@ -40,6 +40,7 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncIndex index);
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr * pOldIndex, SRaftId *oldReplicasId);
void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime);
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);

View File

@ -228,7 +228,7 @@ typedef struct SSyncNode {
} SSyncNode;
// open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst);
int32_t syncNodeStart(SSyncNode* pSyncNode);
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);
@ -238,6 +238,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int
int32_t syncNodeRestore(SSyncNode* pSyncNode);
void syncHbTimerDataFree(SSyncHbTimerData* pData);
// config
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str);
// on message ---------------------
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);

View File

@ -105,7 +105,7 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str);
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex);
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode);
@ -115,7 +115,7 @@ int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode);
int32_t applyCode, bool force);
#ifdef __cplusplus
}
#endif

View File

@ -46,7 +46,7 @@ void syncEntryDestroy(SSyncRaftEntry* pEntry);
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
static FORCE_INLINE bool syncLogReplBarrier(SSyncRaftEntry* pEntry) {
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
return pEntry->originalRpcType == TDMT_SYNC_NOOP || pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE;
}
#ifdef __cplusplus

View File

@ -151,8 +151,9 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
}
sTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "",
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex);
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64,
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
pEntry->term);
// accept
if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
@ -162,7 +163,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
_SEND_RESPONSE:
pEntry = NULL;
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm);
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm, "OnAppn");
bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
if (accepted && matched) {
pReply->success = true;

View File

@ -70,6 +70,27 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, Sync
DID(pRaftId), CID(pRaftId));
}
void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr * pOldIndex, SRaftId *oldReplicasId){
for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){
sDebug("old Index j:%d, index:%"PRId64, j, pOldIndex->index[j]);
}
for (int i = 0; i < pNewIndex->totalReplicaNum; ++i) {
for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){
if (syncUtilSameId(/*(const SRaftId*)*/&((oldReplicasId[j])), &((*(pNewIndex->replicas))[i]))) {
pNewIndex->index[i] = pOldIndex->index[j];
pNewIndex->privateTerm[i] = pOldIndex->privateTerm[j];
pNewIndex->startTimeArr[i] = pOldIndex->startTimeArr[j];
pNewIndex->recvTimeArr[i] = pOldIndex->recvTimeArr[j];
}
}
}
for (int i = 0; i < pNewIndex->totalReplicaNum; ++i){
sDebug("new index i:%d, index:%"PRId64, i, pNewIndex->index[i]);
}
}
SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
for (int i = 0; i < pNode->totalReplicaNum; i++) {
if (syncUtilSameId(&pNode->replicasId[i], pRaftId)) {

View File

@ -59,8 +59,8 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
int64_t syncOpen(SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
int64_t syncOpen(SSyncInfo* pSyncInfo, bool isFirst) {
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, isFirst);
if (pSyncNode == NULL) {
sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
return -1;
@ -514,6 +514,20 @@ SSyncState syncGetState(int64_t rid) {
} else {
state.canRead = state.restored;
}
/*
double progress = 0;
if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
state.progress = (int32_t)(progress * 100);
}
else{
state.progress = -1;
}
sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
"progress:%lf, progress:%d",
pSyncNode->vgId,
pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
*/
syncNodeRelease(pSyncNode);
}
@ -570,6 +584,20 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
return ret;
}
int32_t syncCheckMember(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sError("sync propose error");
return -1;
}
if(pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){
return -1;
}
return 0;
}
int32_t syncIsCatchUp(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
@ -631,15 +659,26 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
}
// optimized one replica
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
SyncIndex retIndex;
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
if (code == 0) {
if (code >= 0) {
pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType));
return 1;
//after raft member change, need to handle 1->2 switching point
//at this point, need to switch entry handling thread
if(pSyncNode->replicaNum == 1){
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType));
return 1;
}
else{
sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64 " type:%s, "
"handle:%p", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType), pMsg->info.handle);
return 0;
}
} else {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
@ -739,7 +778,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
}
// open/close --------------
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, bool isFirst) {
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
if (pSyncNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -759,7 +798,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
TD_DIRSEP);
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
if (!taosCheckExistFile(pSyncNode->configPath)) {
if (!taosCheckExistFile(pSyncNode->configPath) && isFirst) {
// create a new raft config file
sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
@ -781,20 +820,23 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
goto _error;
}
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
if (syncWriteCfgFile(pSyncNode) != 0) {
sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
goto _error;
if(isFirst){
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
if (syncWriteCfgFile(pSyncNode) != 0) {
sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
goto _error;
}
} else {
sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
}
} else {
sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
}
}
// init by SSyncInfo
// init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
bool updated = false;
@ -809,14 +851,16 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pNode->nodeId, pNode->clusterId);
}
if (updated) {
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
if (syncWriteCfgFile(pSyncNode) != 0) {
sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
goto _error;
if(isFirst){
if (updated) {
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
if (syncWriteCfgFile(pSyncNode) != 0) {
sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
goto _error;
}
}
}
pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
@ -2250,6 +2294,526 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
return code;
}
void syncBuildConfigFromReq(SAlterVnodeReplicaReq *pReq, SSyncCfg *cfg){//TODO SAlterVnodeReplicaReq name is proper?
cfg->replicaNum = 0;
cfg->totalReplicaNum = 0;
for (int i = 0; i < pReq->replica; ++i) {
SNodeInfo *pNode = &cfg->nodeInfo[i];
pNode->nodeId = pReq->replicas[i].id;
pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole);
cfg->replicaNum++;
}
if(pReq->selfIndex != -1){
cfg->myIndex = pReq->selfIndex;
}
for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
SNodeInfo *pNode = &cfg->nodeInfo[i];
pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole);
cfg->totalReplicaNum++;
}
cfg->totalReplicaNum += pReq->replica;
if(pReq->learnerSelfIndex != -1){
cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
}
cfg->changeVersion = pReq->changeVersion;
}
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){
return -1;
}
SMsgHead *head = (SMsgHead *)pEntry->data;
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
SAlterVnodeTypeReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SSyncCfg cfg = {0};
syncBuildConfigFromReq(&req, &cfg);
if(cfg.totalReplicaNum >= 1 && ths->state == TAOS_SYNC_STATE_LEADER){
bool incfg = false;
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
incfg = true;
break;
}
}
if(!incfg){
SyncTerm currentTerm = raftStoreGetTerm(ths);
syncNodeStepDown(ths, currentTerm);
return 1;
}
}
return 0;
}
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){
sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d, "
"restoreFinish:%d",
ths->vgId, str,
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
ths->restoreFinish);
sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
ths->vgId, str, ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn,
ths->myNodeInfo.nodePort, ths->myNodeInfo.nodeRole);
for (int32_t i = 0; i < ths->peersNum; ++i){
sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
ths->vgId, str, i, ths->peersNodeInfo[i].clusterId,
ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
}
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){
sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
ths->vgId, str, i, ths->raftCfg.cfg.nodeInfo[i].clusterId,
ths->raftCfg.cfg.nodeInfo[i].nodeId, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn,
ths->raftCfg.cfg.nodeInfo[i].nodePort, ths->raftCfg.cfg.nodeInfo[i].nodeRole);
}
}
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
int32_t i = 0;
//change peersNodeInfo
i = 0;
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
if(!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){
ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
if (!syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i])) {
sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
return -1;
}
i++;
}
}
ths->peersNum = i;
//change cfg nodeInfo
ths->raftCfg.cfg.replicaNum = 0;
i = 0;
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
ths->raftCfg.cfg.replicaNum++;
}
ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
i++;
}
ths->raftCfg.cfg.totalReplicaNum = i;
return 0;
}
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg *cfg){
//change peersNodeInfo
for (int32_t i = 0; i < ths->peersNum; ++i) {
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
if(strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
&& ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
}
}
}
}
//change cfg nodeInfo
ths->raftCfg.cfg.replicaNum = 0;
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
if(strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
&& ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
ths->raftCfg.cfg.replicaNum++;
}
}
}
}
}
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum){
//1.rebuild replicasId, remove deleted one
SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
ths->replicaNum = ths->raftCfg.cfg.replicaNum;
ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]);
}
//2.rebuild MatchIndex, remove deleted one
SSyncIndexMgr *oldIndex = ths->pMatchIndex;
ths->pMatchIndex = syncIndexMgrCreate(ths);
syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
syncIndexMgrDestroy(oldIndex);
//3.rebuild NextIndex, remove deleted one
SSyncIndexMgr *oldNextIndex = ths->pNextIndex;
ths->pNextIndex = syncIndexMgrCreate(ths);
syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
syncIndexMgrDestroy(oldNextIndex);
//4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
voteGrantedUpdate(ths->pVotesGranted, ths);
votesRespondUpdate(ths->pVotesRespond, ths);
//5.rebuild logReplMgr
for(int i = 0; i < oldtotalReplicaNum; ++i){
sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId, i,
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
}
SSyncLogReplMgr oldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
SSyncLogReplMgr *pOldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
for(int i = 0; i < oldtotalReplicaNum; i++){
oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
pOldLogReplMgrs[i] = ths->logReplMgrs[i];
}
for(int i = 0; i < ths->totalReplicaNum; ++i){
ths->logReplMgrs[i] = syncLogReplCreate();
if(ths->logReplMgrs[i] == NULL){
return -1;
}
for(int j = 0; j < oldtotalReplicaNum; j++){
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
*(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
}
}
ths->logReplMgrs[i]->peerId = i;
}
for(int i = 0; i < oldtotalReplicaNum; i++){
syncLogReplDestroy(pOldLogReplMgrs[i]);
}
for(int i = 0; i < ths->totalReplicaNum; ++i){
sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")" , ths->vgId, i,
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
}
//6.rebuild sender
for(int i = 0; i < oldtotalReplicaNum; ++i){
sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64,
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
}
SSyncSnapshotSender oldSender[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
SSyncSnapshotSender *pOldSender[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
for(int i = 0; i < oldtotalReplicaNum; i++){
oldSender[i] = *(ths->senders[i]);
pOldSender[i] = ths->senders[i];
}
for(int i = 0; i < ths->totalReplicaNum; i++){
ths->senders[i] = snapshotSenderCreate(ths, i);
for(int j = 0; j < oldtotalReplicaNum; j++){
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){
*(ths->senders[i]) = oldSender[j];
}
}
}
for(int i = 0; i < oldtotalReplicaNum; i++){
snapshotSenderDestroy(pOldSender[i]);
}
for(int i = 0; i < ths->totalReplicaNum; i++){
sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64,
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
}
//7.rebuild synctimer
syncNodeStopHeartbeatTimer(ths);
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i]);
}
syncNodeStartHeartbeatTimer(ths);
//8.rebuild peerStates
SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
for(int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++){
oldState[i] = ths->peerStates[i];
}
for(int i = 0; i < ths->totalReplicaNum; i++){
for(int j = 0; j < oldtotalReplicaNum; j++){
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){
ths->peerStates[i] = oldState[j];
}
}
}
return 0;
}
void syncNodeChangeToVoter(SSyncNode* ths){
//replicasId, only need to change replicaNum when 1->3
ths->replicaNum = ths->raftCfg.cfg.replicaNum;
sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
for (int32_t i = 0; i < ths->totalReplicaNum; ++i){
sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
}
//pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i){
sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
}
//pVotesGranted, pVotesRespond
voteGrantedUpdate(ths->pVotesGranted, ths);
votesRespondUpdate(ths->pVotesRespond, ths);
//logRepMgrs
//no need to change logRepMgrs when 1->3
}
void syncNodeResetPeerAndCfg(SSyncNode* ths){
SNodeInfo node = {0};
for (int32_t i = 0; i < ths->peersNum; ++i) {
memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
}
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
}
}
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){
return -1;
}
SMsgHead *head = (SMsgHead *)pEntry->data;
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
SAlterVnodeTypeReq req = {0};
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SSyncCfg *cfg = taosMemoryMalloc(sizeof(SSyncCfg));
if(cfg == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
syncBuildConfigFromReq(&req, cfg);
if(cfg->changeVersion <= ths->raftCfg.cfg.changeVersion){
sInfo("vgId:%d, skip conf change entry since lower version. "
"this entry, index:%" PRId64 ", term:%" PRId64 ", totalReplicaNum:%d, changeVersion:%d; "
"current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64", changeVersion:%d",
ths->vgId,
pEntry->index, pEntry->term, cfg->totalReplicaNum, cfg->changeVersion,
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
return 0;
}
if(strcmp(str, "Commit") == 0){
sInfo("vgId:%d, change config from %s. "
"this, i:%" PRId64 ", trNum:%d, vers:%d; "
"node, rNum:%d, pNum:%d, trNum:%d, "
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
"cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
ths->vgId, str, pEntry->index - 1, cfg->totalReplicaNum, cfg->changeVersion,
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
}
else{
sInfo("vgId:%d, change config from %s. "
"this, i:%" PRId64 ", t:%" PRId64 ", trNum:%d, vers:%d; "
"node, rNum:%d, pNum:%d, trNum:%d, "
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
"cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
ths->vgId, str, pEntry->index, pEntry->term, cfg->totalReplicaNum, cfg->changeVersion,
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
pEntry->index -1, ths->commitIndex, ths->pLogBuf->commitIndex);
}
syncNodeLogConfigInfo(ths, cfg, "before config change");
int32_t oldTotalReplicaNum = ths->totalReplicaNum;
if(cfg->totalReplicaNum == 1 || cfg->totalReplicaNum == 2){//remove replica
bool incfg = false;
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort){
incfg = true;
break;
}
}
if(incfg){//remove other
syncNodeResetPeerAndCfg(ths);
//no need to change myNodeInfo
if(syncNodeRebuildPeerAndCfg(ths, cfg) != 0){
return -1;
};
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
return -1;
};
}
else{//remove myself
//no need to do anything actually, to change the following to reduce distruptive server chance
syncNodeResetPeerAndCfg(ths);
//change myNodeInfo
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
//change peer and cfg
ths->peersNum = 0;
memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
ths->raftCfg.cfg.replicaNum = 0;
ths->raftCfg.cfg.totalReplicaNum = 1;
//change other
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
return -1;
}
//change state
ths->state = TAOS_SYNC_STATE_LEARNER;
}
ths->restoreFinish = false;
}
else{//add replica, or change replica type
if(ths->totalReplicaNum == 3){ //change replica type
sInfo("vgId:%d, begin change replica type", ths->vgId);
//change myNodeInfo
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort){
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
}
}
}
//change peer and cfg
syncNodeChangePeerAndCfgToVoter(ths, cfg);
//change other
syncNodeChangeToVoter(ths);
//change state
if(ths->state ==TAOS_SYNC_STATE_LEARNER){
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER ){
ths->state = TAOS_SYNC_STATE_FOLLOWER;
}
}
ths->restoreFinish = false;
}
else{//add replica
sInfo("vgId:%d, begin add replica", ths->vgId);
//no need to change myNodeInfo
//change peer and cfg
if(syncNodeRebuildPeerAndCfg(ths, cfg) != 0){
return -1;
};
//change other
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
return -1;
};
//no need to change state
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){
ths->restoreFinish = false;
}
}
}
ths->quorum = syncUtilQuorum(ths->replicaNum);
ths->raftCfg.lastConfigIndex = pEntry->index;
ths->raftCfg.cfg.lastIndex = pEntry->index;
ths->raftCfg.cfg.changeVersion = cfg->changeVersion;
syncNodeLogConfigInfo(ths, cfg, "after config change");
if(syncWriteCfgFile(ths) != 0){
sError("vgId:%d, failed to create sync cfg file", ths->vgId);
return -1;
};
taosMemoryFree(cfg);
return 0;
}
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (pEntry->dataLen < sizeof(SMsgHead)) {
sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
@ -2262,13 +2826,13 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
ASSERT(terrno != 0);
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
syncEntryDestroy(pEntry);
return -1;
}
// proceed match index, with replicating on needed
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")",
@ -2299,6 +2863,9 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
int32_t toCount = 0;
int64_t tsNow = taosGetTimestampMs();
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
if(pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER){
continue;
}
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
if (recvTime == 0 || recvTime == -1) {
continue;
@ -2560,6 +3127,14 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
}
//1->2, config change is add in write thread, and will continue in sync thread
//need save message for it
if(pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE){
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
uint64_t seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
pEntry->seqNum = seqNum;
}
if (pEntry == NULL) {
sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
return -1;
@ -2570,7 +3145,24 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
(*pRetIndex) = index;
}
int32_t code = syncNodeAppend(ths, pEntry);
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
if(code < 0){
sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
return -1;
}
if(code > 0){
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
(void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
return -1;
}
}
code = syncNodeAppend(ths, pEntry);
return code;
} else {
syncEntryDestroy(pEntry);

View File

@ -25,6 +25,8 @@
#include "syncRespMgr.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncRaftCfg.h"
#include "syncVoteMgr.h"
static bool syncIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
@ -428,7 +430,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf
return 0;
}
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) {
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str) {
taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf);
@ -475,9 +477,6 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
sTrace("vgId:%d, log buffer proceed. start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
// replicate on demand
(void)syncNodeReplicateWithoutLock(pNode);
// persist
if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
@ -485,6 +484,39 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
taosMsleep(1);
goto _out;
}
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
if(pNode->pLogBuf->commitIndex == pEntry->index -1){
sInfo("vgId:%d, to change config at %s. "
"current entry, index:%" PRId64 ", term:%" PRId64", "
"node, restore:%d, commitIndex:%" PRId64 ", "
"cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
pNode->vgId, str,
pEntry->index, pEntry->term,
pNode->restoreFinish, pNode->commitIndex,
pEntry->index - 1, pNode->pLogBuf->commitIndex);
if(syncNodeChangeConfig(pNode, pEntry, str) != 0){
sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, terrstr(),
pEntry->index);
goto _out;
}
}
else{
sInfo("vgId:%d, delay change config from Node %s. "
"curent entry, index:%" PRId64 ", term:%" PRId64 ", "
"node, commitIndex:%" PRId64 ", pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
"cond:( pre entry index:%" PRId64" != buf commit index:%" PRId64 ")",
pNode->vgId, str,
pEntry->index, pEntry->term,
pNode->commitIndex, pNode->pLogBuf->startIndex, pNode->pLogBuf->commitIndex,
pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex,
pEntry->index - 1, pNode->pLogBuf->commitIndex);
}
}
// replicate on demand
(void)syncNodeReplicateWithoutLock(pNode);
ASSERT(pEntry->index == pBuf->matchIndex);
// update my match index
@ -503,8 +535,16 @@ _out:
}
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode) {
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) {
int32_t applyCode, bool force) {
//learner need to execute fsm when it catch up entry log
//if force is true, keep all contition check to execute fsm
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1
&& pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER
&& force == false) {
sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d,"
"role:%d, restoreFinish:%d",
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode,
pNode->replicaNum, pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
return 0;
}
@ -559,6 +599,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
SSyncRaftEntry* pEntry = NULL;
bool inBuf = false;
SSyncRaftEntry* pNextEntry = NULL;
bool nextInBuf = false;
if (commitIndex <= pBuf->commitIndex) {
sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
@ -584,7 +626,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
}
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) {
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64,
vgId, pEntry->index, pEntry->term, role, currentTerm);
@ -595,10 +637,50 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
pEntry->index, pEntry->term, role, currentTerm);
pNextEntry = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf);
if (pNextEntry != NULL && pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
sInfo("vgId:%d, to change config at Commit. "
"current entry, index:%" PRId64 ", term:%" PRId64", "
"node, role:%d, current term:%" PRId64 ", restore:%d, "
"cond, next entry index:%" PRId64 ", msgType:%s",
vgId,
pEntry->index, pEntry->term,
role, currentTerm, pNode->restoreFinish,
pNextEntry->index, TMSG_INFO(pNextEntry->originalRpcType));
if(syncNodeChangeConfig(pNode, pNextEntry, "Commit") != 0){
sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64,
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
goto _out;
}
//for 2->1, need to apply config change entry in sync thread,
if(pNode->replicaNum == 1){
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) {
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
", role:%d, current term:%" PRId64,
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
goto _out;
}
index++;
pBuf->commitIndex = index;
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
pNextEntry->index, pNextEntry->term, role, currentTerm);
}
if (!nextInBuf) {
syncEntryDestroy(pNextEntry);
pNextEntry = NULL;
}
}
if (!inBuf) {
syncEntryDestroy(pEntry);
pEntry = NULL;
}
}
}
// recycle

View File

@ -44,6 +44,7 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
SSyncCfg *pCfg = (SSyncCfg *)pObj;
if (tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "changeVersion", pCfg->changeVersion) < 0) return -1;
SJson *nodeInfo = tjsonCreateArray();
if (nodeInfo == NULL) return -1;
@ -113,8 +114,9 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
if (taosRenameFile(file, realfile) != 0) goto _OVER;
code = 0;
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64, pNode->vgId,
realfile, len, pNode->raftCfg.lastConfigIndex);
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", "
"changeVersion:%d", pNode->vgId,
realfile, len, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.cfg.changeVersion);
_OVER:
if (pJson != NULL) tjsonDelete(pJson);
@ -136,6 +138,8 @@ static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) {
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(pJson, "myIndex", pCfg->myIndex, code);
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(pJson, "changeVersion", pCfg->changeVersion, code);
if (code < 0) return -1;
SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo");
if (nodeInfo == NULL) return -1;