commit
e3eb0f97cc
|
@ -77,7 +77,8 @@ static inline bool tmsgIsValid(tmsg_t type) {
|
|||
}
|
||||
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT);
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT) ||
|
||||
(type == TDMT_SYNC_CONFIG_CHANGE);
|
||||
}
|
||||
|
||||
static inline bool syncUtilUserCommit(tmsg_t msgType) {
|
||||
|
@ -1400,6 +1401,7 @@ typedef struct {
|
|||
int64_t numOfBatchInsertReqs;
|
||||
int64_t numOfBatchInsertSuccessReqs;
|
||||
int32_t numOfCachedTables;
|
||||
int32_t learnerProgress; // use one reservered
|
||||
} SVnodeLoad;
|
||||
|
||||
typedef struct {
|
||||
|
@ -1541,6 +1543,7 @@ typedef struct {
|
|||
int8_t learnerReplica;
|
||||
int8_t learnerSelfIndex;
|
||||
SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA];
|
||||
int32_t changeVersion;
|
||||
} SCreateVnodeReq;
|
||||
|
||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||
|
@ -1615,7 +1618,8 @@ typedef struct {
|
|||
int8_t learnerSelfIndex;
|
||||
int8_t learnerReplica;
|
||||
SReplica learnerReplicas[TSDB_MAX_LEARNER_REPLICA];
|
||||
} SAlterVnodeReplicaReq, SAlterVnodeTypeReq;
|
||||
int32_t changeVersion;
|
||||
} SAlterVnodeReplicaReq, SAlterVnodeTypeReq, SCheckLearnCatchupReq;
|
||||
|
||||
int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
|
||||
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
|
||||
|
@ -1633,7 +1637,8 @@ typedef struct {
|
|||
int32_t dstVgId;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
int64_t reserved;
|
||||
int32_t changeVersion;
|
||||
int32_t reserved;
|
||||
} SAlterVnodeHashRangeReq;
|
||||
|
||||
int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq);
|
||||
|
|
|
@ -85,6 +85,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE_TYPE, "dnode-alter-mnode-type", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE_TYPE, "dnode-alter-vnode-type", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, "dnode-check-vnode-learner-catchup", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL)
|
||||
|
|
|
@ -101,6 +101,7 @@ typedef struct SSyncCfg {
|
|||
int32_t myIndex;
|
||||
SNodeInfo nodeInfo[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
||||
SyncIndex lastIndex;
|
||||
int32_t changeVersion;
|
||||
} SSyncCfg;
|
||||
|
||||
typedef struct SFsmCbMeta {
|
||||
|
@ -239,20 +240,22 @@ typedef struct SSyncState {
|
|||
ESyncState state;
|
||||
bool restored;
|
||||
bool canRead;
|
||||
int32_t progress;
|
||||
SyncTerm term;
|
||||
int64_t roleTimeMs;
|
||||
int64_t startTimeMs;
|
||||
} SSyncState;
|
||||
|
||||
int32_t syncInit();
|
||||
void syncCleanUp();
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
||||
int32_t syncStart(int64_t rid);
|
||||
void syncStop(int64_t rid);
|
||||
void syncPreStop(int64_t rid);
|
||||
void syncPostStop(int64_t rid);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
|
||||
int32_t syncIsCatchUp(int64_t rid);
|
||||
int32_t syncInit();
|
||||
void syncCleanUp();
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
|
||||
int32_t syncStart(int64_t rid);
|
||||
void syncStop(int64_t rid);
|
||||
void syncPreStop(int64_t rid);
|
||||
void syncPostStop(int64_t rid);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
|
||||
int32_t syncCheckMember(int64_t rid);
|
||||
int32_t syncIsCatchUp(int64_t rid);
|
||||
ESyncRole syncGetRole(int64_t rid);
|
||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
||||
|
@ -270,6 +273,8 @@ SSyncState syncGetState(int64_t rid);
|
|||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
const char* syncStr(ESyncState state);
|
||||
|
||||
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1082,7 +1082,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, reserved) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pload->learnerProgress) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pload->roleTimeMs) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pload->startTimeMs) < 0) return -1;
|
||||
}
|
||||
|
@ -1163,7 +1163,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
for (int32_t i = 0; i < vlen; ++i) {
|
||||
SVnodeLoad vload = {0};
|
||||
vload.syncTerm = -1;
|
||||
int32_t reserved32 = 0;
|
||||
|
||||
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1;
|
||||
|
@ -1175,7 +1175,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &vload.learnerProgress) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &vload.roleTimeMs) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &vload.startTimeMs) < 0) return -1;
|
||||
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
||||
|
@ -4348,6 +4348,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
|||
SReplica *pReplica = &pReq->learnerReplicas[i];
|
||||
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -4434,6 +4435,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
|||
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4666,6 +4670,7 @@ int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeRe
|
|||
SReplica *pReplica = &pReq->learnerReplicas[i];
|
||||
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -4697,6 +4702,9 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode
|
|||
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
||||
}
|
||||
}
|
||||
if (!tDecodeIsEnd(&decoder)){
|
||||
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -4740,7 +4748,8 @@ int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnode
|
|||
if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->changeVersion) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -4758,7 +4767,8 @@ int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVno
|
|||
if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->changeVersion) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
|
|
|
@ -106,6 +106,8 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -97,6 +97,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
// vmFile.c
|
||||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
||||
|
|
|
@ -133,6 +133,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
pCfg->standby = 0;
|
||||
pCfg->syncCfg.replicaNum = 0;
|
||||
pCfg->syncCfg.totalReplicaNum = 0;
|
||||
pCfg->syncCfg.changeVersion = pCreate->changeVersion;
|
||||
|
||||
memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
|
||||
for (int32_t i = 0; i < pCreate->replica; ++i) {
|
||||
|
@ -211,14 +212,15 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
|
||||
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
|
||||
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
|
||||
"learnerReplica:%d learnerSelfIndex:%d strict:%d",
|
||||
"learnerReplica:%d learnerSelfIndex:%d strict:%d changeVersion:%d",
|
||||
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
|
||||
(uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
|
||||
req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
|
||||
req.isTsma, req.precision, req.compression, req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel,
|
||||
req.walRetentionPeriod, req.walRetentionSize, req.walRollPeriod, req.walSegmentSize, req.hashMethod,
|
||||
req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica,
|
||||
req.learnerSelfIndex, req.strict);
|
||||
req.learnerSelfIndex, req.strict, req.changeVersion);
|
||||
|
||||
for (int32_t i = 0; i < req.replica; ++i) {
|
||||
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
|
||||
req.replicas[i].id);
|
||||
|
@ -323,6 +325,7 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
//alter replica doesn't use this, but restore dnode still use this
|
||||
int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SAlterVnodeTypeReq req = {0};
|
||||
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
|
||||
|
@ -363,8 +366,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
|
||||
|
||||
int32_t vgId = req.vgId;
|
||||
dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d", vgId, req.replica, req.selfIndex,
|
||||
req.strict);
|
||||
dInfo("vgId:%d, start to alter vnode type replica:%d selfIndex:%d strict:%d changeVersion:%d",
|
||||
vgId, req.replica, req.selfIndex, req.strict, req.changeVersion);
|
||||
for (int32_t i = 0; i < req.replica; ++i) {
|
||||
SReplica *pReplica = &req.replicas[i];
|
||||
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
|
||||
|
@ -424,7 +427,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
if (vmOpenVnode(pMgmt, &wrapperCfg, pImpl) != 0) {
|
||||
dError("vgId:%d, failed to open vnode mgmt since %s", vgId, terrstr());
|
||||
return -1;
|
||||
|
@ -440,6 +443,53 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SCheckLearnCatchupReq req = {0};
|
||||
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(req.learnerReplicas == 0){
|
||||
req.learnerSelfIndex = -1;
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, vnode management handle msgType:%s, start to process check-learner-catchup-request",
|
||||
req.vgId, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||
if (pVnode == NULL) {
|
||||
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
|
||||
terrno = TSDB_CODE_VND_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
ESyncRole role = vnodeGetRole(pVnode->pImpl);
|
||||
dInfo("vgId:%d, checking node role:%d", req.vgId, role);
|
||||
if(role == TAOS_SYNC_ROLE_VOTER){
|
||||
dError("vgId:%d, failed to alter vnode type since node already is role:%d", req.vgId, role);
|
||||
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("vgId:%d, checking node catch up", req.vgId);
|
||||
if(vnodeIsCatchUp(pVnode->pImpl) != 1){
|
||||
terrno = TSDB_CODE_VND_NOT_CATCH_UP;
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("node:%s, catched up leader, continue to process alter-node-type-request", pMgmt->name);
|
||||
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
|
||||
dInfo("vgId:%d, vnode management handle msgType:%s, end to process check-learner-catchup-request",
|
||||
req.vgId, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SDisableVnodeWriteReq req = {0};
|
||||
if (tDeserializeSDisableVnodeWriteReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
|
||||
|
@ -520,6 +570,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
|
||||
dInfo("vgId:%d, open vnode", dstVgId);
|
||||
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
|
||||
|
||||
if (pImpl == NULL) {
|
||||
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
|
||||
return -1;
|
||||
|
@ -559,9 +610,10 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
int32_t vgId = alterReq.vgId;
|
||||
dInfo(
|
||||
"vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
|
||||
"learnerSelfIndex:%d strict:%d",
|
||||
"learnerSelfIndex:%d strict:%d changeVersion:%d",
|
||||
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
|
||||
alterReq.learnerSelfIndex, alterReq.strict);
|
||||
alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
|
||||
|
||||
for (int32_t i = 0; i < alterReq.replica; ++i) {
|
||||
SReplica *pReplica = &alterReq.replicas[i];
|
||||
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
|
||||
|
@ -755,6 +807,8 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT_ELECTION, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -267,6 +267,7 @@ static void *vmOpenVnodeInThread(void *param) {
|
|||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
||||
|
||||
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
|
||||
|
||||
if (pImpl == NULL) {
|
||||
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
|
||||
pThread->failed++;
|
||||
|
|
|
@ -53,6 +53,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
case TDMT_DND_ALTER_VNODE_TYPE:
|
||||
code = vmProcessAlterVnodeTypeReq(pMgmt, pMsg);
|
||||
break;
|
||||
case TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP:
|
||||
code = vmProcessCheckLearnCatchupReq(pMgmt, pMsg);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
||||
|
|
|
@ -353,6 +353,7 @@ typedef struct {
|
|||
int64_t roleTimeMs;
|
||||
int64_t startTimeMs;
|
||||
ESyncRole nodeRole;
|
||||
int32_t learnerProgress;
|
||||
} SVnodeGid;
|
||||
|
||||
typedef struct {
|
||||
|
@ -376,6 +377,7 @@ typedef struct {
|
|||
SVnodeGid vnodeGid[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
||||
void* pTsma;
|
||||
int32_t numOfCachedTables;
|
||||
int32_t syncConfChangeVer;
|
||||
} SVgObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -47,6 +47,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
|||
SArray *pArray);
|
||||
int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int64_t compactTs,
|
||||
STimeWindow tw);
|
||||
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||
SArray *pArray);
|
||||
|
||||
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
|
|
|
@ -659,10 +659,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
|
||||
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
|
||||
if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER;
|
||||
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
|
||||
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
|
@ -882,7 +882,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (mndVgroupInDb(pVgroup, pNewDb->uid)) {
|
||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) {
|
||||
if (mndBuildRaftAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
taosArrayDestroy(pArray);
|
||||
|
|
|
@ -475,7 +475,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
||||
pMgmt->sync = syncOpen(&syncInfo);
|
||||
pMgmt->sync = syncOpen(&syncInfo, true);
|
||||
if (pMgmt->sync <= 0) {
|
||||
mError("failed to open sync since %s", terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -1124,7 +1124,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
|||
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
|
||||
if (code == 0) {
|
||||
pAction->msgSent = 1;
|
||||
pAction->msgReceived = 0;
|
||||
//pAction->msgReceived = 0;
|
||||
pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
|
||||
|
||||
|
|
|
@ -67,6 +67,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_VND_DISABLE_WRITE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_SYNC_FORCE_FOLLOWER_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_DND_ALTER_VNODE_TYPE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_SYNC_CONFIG_CHANGE_RSP, mndTransProcessRsp);
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_REDISTRIBUTE_VGROUP, mndProcessRedistributeVgroupMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_SPLIT_VGROUP, mndProcessSplitVgroupMsg);
|
||||
|
@ -105,6 +107,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
|
|||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||
SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER)
|
||||
}
|
||||
SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER)
|
||||
SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||
|
||||
|
@ -129,7 +132,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
|
|||
int8_t sver = 0;
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
|
||||
|
||||
if (sver != VGROUP_VER_NUMBER) {
|
||||
if (sver < 1 || sver > VGROUP_VER_NUMBER) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -158,6 +161,10 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
|
|||
pVgid->syncState = TAOS_SYNC_STATE_LEADER;
|
||||
}
|
||||
}
|
||||
if(dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen){
|
||||
SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER)
|
||||
}
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
@ -220,6 +227,7 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) {
|
|||
pNew->pointsWritten = pOld->pointsWritten;
|
||||
pNew->compact = pOld->compact;
|
||||
memcpy(pOld->vnodeGid, pNew->vnodeGid, (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA) * sizeof(SVnodeGid));
|
||||
pOld->syncConfChangeVer = pNew->syncConfChangeVer;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -279,6 +287,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
createReq.hashPrefix = pDb->cfg.hashPrefix;
|
||||
createReq.hashSuffix = pDb->cfg.hashSuffix;
|
||||
createReq.tsdbPageSize = pDb->cfg.tsdbPageSize;
|
||||
createReq.changeVersion= ++(pVgroup->syncConfChangeVer);
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SReplica *pReplica = NULL;
|
||||
|
@ -325,9 +334,12 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
|||
return NULL;
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
|
||||
createReq.changeVersion = pVgroup->syncConfChangeVer;
|
||||
|
||||
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
|
||||
"changeVersion:%d",
|
||||
createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica,
|
||||
createReq.learnerReplica, createReq.strict);
|
||||
createReq.learnerReplica, createReq.strict, createReq.changeVersion);
|
||||
for (int32_t i = 0; i < createReq.replica; ++i) {
|
||||
mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
|
||||
}
|
||||
|
@ -405,6 +417,7 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
|
|||
.learnerReplica = 0,
|
||||
.selfIndex = -1,
|
||||
.learnerSelfIndex = -1,
|
||||
.changeVersion = ++(pVgroup->syncConfChangeVer),
|
||||
};
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
|
@ -440,9 +453,10 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
|
|||
}
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
|
||||
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d "
|
||||
"changeVersion:%d",
|
||||
alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
|
||||
alterReq.learnerSelfIndex, alterReq.strict);
|
||||
alterReq.learnerSelfIndex, alterReq.strict, alterReq.changeVersion);
|
||||
for (int32_t i = 0; i < alterReq.replica; ++i) {
|
||||
mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
|
||||
}
|
||||
|
@ -473,6 +487,83 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
|
|||
return pReq;
|
||||
}
|
||||
|
||||
static void *mndBuildCheckLearnCatchupReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId,
|
||||
int32_t *pContLen) {
|
||||
SCheckLearnCatchupReq req = {
|
||||
.vgId = pVgroup->vgId,
|
||||
.strict = pDb->cfg.strict,
|
||||
.replica = 0,
|
||||
.learnerReplica = 0,
|
||||
.selfIndex = -1,
|
||||
.learnerSelfIndex = -1,
|
||||
};
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
SReplica *pReplica = NULL;
|
||||
|
||||
if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
||||
pReplica = &req.replicas[req.replica];
|
||||
req.replica++;
|
||||
}
|
||||
else{
|
||||
pReplica = &req.learnerReplicas[req.learnerReplica];
|
||||
req.learnerReplica++;
|
||||
}
|
||||
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
|
||||
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
if (pVgidDnode == NULL) return NULL;
|
||||
|
||||
pReplica->id = pVgidDnode->id;
|
||||
pReplica->port = pVgidDnode->port;
|
||||
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
|
||||
mndReleaseDnode(pMnode, pVgidDnode);
|
||||
|
||||
if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
||||
if (dnodeId == pVgid->dnodeId) {
|
||||
req.selfIndex = v;
|
||||
}
|
||||
}
|
||||
else{
|
||||
if (dnodeId == pVgid->dnodeId) {
|
||||
req.learnerSelfIndex = v;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
|
||||
req.vgId, req.replica, req.selfIndex, req.learnerReplica,
|
||||
req.learnerSelfIndex, req.strict);
|
||||
for (int32_t i = 0; i < req.replica; ++i) {
|
||||
mInfo("vgId:%d, replica:%d ep:%s:%u", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port);
|
||||
}
|
||||
for (int32_t i = 0; i < req.learnerReplica; ++i) {
|
||||
mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", req.vgId, i,
|
||||
req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port);
|
||||
}
|
||||
|
||||
if (req.selfIndex == -1 && req.learnerSelfIndex == -1) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &req);
|
||||
if (contLen < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *pReq = taosMemoryMalloc(contLen);
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tSerializeSAlterVnodeReplicaReq(pReq, contLen, &req);
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t vgId, int32_t *pContLen) {
|
||||
SDisableVnodeWriteReq disableReq = {
|
||||
.vgId = vgId,
|
||||
|
@ -503,6 +594,7 @@ static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, int32_t srcVgId, SVg
|
|||
.dstVgId = pVgroup->vgId,
|
||||
.hashBegin = pVgroup->hashBegin,
|
||||
.hashEnd = pVgroup->hashEnd,
|
||||
.changeVersion = ++(pVgroup->syncConfChangeVer),
|
||||
};
|
||||
|
||||
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, hashrange:[%u, %u]", srcVgId, pVgroup->vgId,
|
||||
|
@ -861,6 +953,28 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
|||
}
|
||||
}
|
||||
snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
|
||||
/*
|
||||
mInfo("db:%s, learner progress:%d", pDb->name, pVgroup->vnodeGid[i].learnerProgress);
|
||||
|
||||
if (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEARNER) {
|
||||
if(pVgroup->vnodeGid[i].learnerProgress < 0){
|
||||
snprintf(role, sizeof(role), "%s-",
|
||||
syncStr(pVgroup->vnodeGid[i].syncState));
|
||||
|
||||
}
|
||||
else if(pVgroup->vnodeGid[i].learnerProgress >= 100){
|
||||
snprintf(role, sizeof(role), "%s--",
|
||||
syncStr(pVgroup->vnodeGid[i].syncState));
|
||||
}
|
||||
else{
|
||||
snprintf(role, sizeof(role), "%s%d",
|
||||
syncStr(pVgroup->vnodeGid[i].syncState), pVgroup->vnodeGid[i].learnerProgress);
|
||||
}
|
||||
}
|
||||
else{
|
||||
snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), star);
|
||||
}
|
||||
*/
|
||||
} else {
|
||||
}
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
|
||||
|
@ -1155,6 +1269,51 @@ _OVER:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndRemoveVnodeFromVgroupWithoutSave(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SArray *pArray,
|
||||
SVnodeGid *pDelVgid) {
|
||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||
mInfo("dnode:%d, equivalent vnodes:%d others:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfOtherNodes);
|
||||
}
|
||||
|
||||
int32_t code = -1;
|
||||
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
|
||||
SDnodeObj *pDnode = taosArrayGet(pArray, d);
|
||||
|
||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
||||
if (pVgid->dnodeId == pDnode->id) {
|
||||
int64_t vgMem = mndGetVgroupMemory(pMnode, NULL, pVgroup);
|
||||
pDnode->memUsed -= vgMem;
|
||||
mInfo("db:%s, vgId:%d, vn:%d is removed, memory:%" PRId64 ", dnode:%d avail:%" PRId64 " used:%" PRId64,
|
||||
pVgroup->dbName, pVgroup->vgId, vn, vgMem, pVgid->dnodeId, pDnode->memAvail, pDnode->memUsed);
|
||||
pDnode->numOfVnodes--;
|
||||
pVgroup->replica--;
|
||||
*pDelVgid = *pVgid;
|
||||
*pVgid = pVgroup->vnodeGid[pVgroup->replica];
|
||||
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
||||
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
|
||||
STransAction action = {0};
|
||||
|
||||
|
@ -1231,6 +1390,42 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddChangeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
||||
SVgObj *pOldVgroup, SVgObj *pNewVgroup, int32_t dnodeId) {
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pNewVgroup);
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pNewVgroup, dnodeId, &contLen);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
int32_t totallen = contLen + sizeof(SMsgHead);
|
||||
|
||||
SMsgHead *pHead = taosMemoryMalloc(totallen);
|
||||
if (pHead == NULL) {
|
||||
taosMemoryFree(pReq);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pHead->contLen = htonl(totallen);
|
||||
pHead->vgId = htonl(pNewVgroup->vgId);
|
||||
|
||||
memcpy((void*)(pHead + 1), pReq, contLen);
|
||||
taosMemoryFree(pReq);
|
||||
|
||||
action.pCont = pHead;
|
||||
action.contLen = totallen;
|
||||
action.msgType = TDMT_SYNC_CONFIG_CHANGE;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pHead);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, int32_t srcVgId, SVgObj *pVgroup) {
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
|
@ -1310,6 +1505,32 @@ int32_t mndAddAlterVnodeReplicaAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddCheckLearnerCatchupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
|
||||
if (pDnode == NULL) return -1;
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetDnodeEpset(pDnode);
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pReq = mndBuildCheckLearnCatchupReq(pMnode, pDb, pVgroup, dnodeId, &contLen);
|
||||
if (pReq == NULL) return -1;
|
||||
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP;
|
||||
action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
|
||||
action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) {
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
|
||||
if (pDnode == NULL) return -1;
|
||||
|
@ -2227,6 +2448,133 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndBuildRaftAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||
SArray *pArray) {
|
||||
SVgObj newVgroup = {0};
|
||||
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
||||
|
||||
if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
|
||||
if (mndAddAlterVnodeConfigAction(pMnode, pTrans, pNewDb, pVgroup) != 0) return -1;
|
||||
if (mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
mndTransSetSerial(pTrans);
|
||||
|
||||
mInfo("trans:%d, vgId:%d, alter vgroup, syncConfChangeVer:%d, version:%d, replica:%d",
|
||||
pTrans->id, pVgroup->vgId, pVgroup->syncConfChangeVer, pVgroup->version, pVgroup->replica);
|
||||
|
||||
if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
|
||||
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
|
||||
pVgroup->vnodeGid[0].dnodeId);
|
||||
|
||||
//add second
|
||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1;
|
||||
//add third
|
||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVgroup, pArray) != 0) return -1;
|
||||
|
||||
//add learner stage
|
||||
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
|
||||
mInfo("trans:%d, vgId:%d, add change config, syncConfChangeVer:%d, version:%d, replica:%d",
|
||||
pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
|
||||
mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d",
|
||||
pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
|
||||
mInfo("trans:%d, vgId:%d, create vnode, syncConfChangeVer:%d, version:%d, replica:%d",
|
||||
pTrans->id, pVgroup->vgId, newVgroup.syncConfChangeVer, pVgroup->version, pVgroup->replica);
|
||||
|
||||
|
||||
//check learner
|
||||
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
if (mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[1].dnodeId) != 0) return -1;
|
||||
if (mndAddCheckLearnerCatchupAction(pMnode, pTrans, pNewDb, &newVgroup, newVgroup.vnodeGid[2].dnodeId) != 0) return -1;
|
||||
|
||||
//change raft type
|
||||
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
|
||||
return -1;
|
||||
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||
|
||||
newVgroup.vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
newVgroup.vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
newVgroup.vnodeGid[2].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0)
|
||||
return -1;
|
||||
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||
|
||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
|
||||
if (pVgRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
|
||||
sdbFreeRaw(pVgRaw);
|
||||
return -1;
|
||||
}
|
||||
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
||||
} else if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
|
||||
mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
|
||||
pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
|
||||
|
||||
SVnodeGid del1 = {0};
|
||||
if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del1) != 0) return -1;
|
||||
|
||||
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
|
||||
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
|
||||
|
||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
|
||||
if (pVgRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
|
||||
sdbFreeRaw(pVgRaw);
|
||||
return -1;
|
||||
}
|
||||
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
||||
|
||||
SVnodeGid del2 = {0};
|
||||
if (mndRemoveVnodeFromVgroupWithoutSave(pMnode, pTrans, &newVgroup, pArray, &del2) != 0) return -1;
|
||||
|
||||
if (mndAddChangeConfigAction(pMnode, pTrans, pNewDb, pVgroup, &newVgroup, newVgroup.vnodeGid[0].dnodeId) != 0) return -1;
|
||||
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
|
||||
|
||||
pVgRaw = mndVgroupActionEncode(&newVgroup);
|
||||
if (pVgRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
|
||||
sdbFreeRaw(pVgRaw);
|
||||
return -1;
|
||||
}
|
||||
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndSortVnodeGid(&newVgroup);
|
||||
|
||||
{
|
||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
|
||||
if (pVgRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
|
||||
sdbFreeRaw(pVgRaw);
|
||||
return -1;
|
||||
}
|
||||
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup,
|
||||
SDnodeObj *pDnode) {
|
||||
SVgObj newVgroup = {0};
|
||||
|
|
|
@ -110,7 +110,7 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
|
|||
bool vnodeShouldRollback(SVnode* pVnode);
|
||||
|
||||
// vnodeSync.c
|
||||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion);
|
||||
int32_t vnodeSyncStart(SVnode* pVnode);
|
||||
void vnodeSyncPreClose(SVnode* pVnode);
|
||||
void vnodeSyncPostClose(SVnode* pVnode);
|
||||
|
|
|
@ -141,6 +141,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
|
||||
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion) < 0) return -1;
|
||||
|
||||
if (tjsonAddIntegerToObject(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "vndStats.ctables", pCfg->vndStats.numOfCTables) < 0) return -1;
|
||||
|
@ -151,8 +152,9 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
SJson *nodeInfo = tjsonCreateArray();
|
||||
if (nodeInfo == NULL) return -1;
|
||||
if (tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", nodeInfo) < 0) return -1;
|
||||
vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d", pCfg->vgId, pCfg->syncCfg.replicaNum,
|
||||
pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex);
|
||||
vDebug("vgId:%d, encode config, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d",
|
||||
pCfg->vgId, pCfg->syncCfg.replicaNum,
|
||||
pCfg->syncCfg.totalReplicaNum, pCfg->syncCfg.myIndex, pCfg->syncCfg.changeVersion);
|
||||
for (int i = 0; i < pCfg->syncCfg.totalReplicaNum; ++i) {
|
||||
SJson *info = tjsonCreateObject();
|
||||
SNodeInfo *pNode = (SNodeInfo *)&pCfg->syncCfg.nodeInfo[i];
|
||||
|
@ -263,6 +265,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "syncCfg.changeVersion", pCfg->syncCfg.changeVersion, code);
|
||||
if (code < 0) return -1;
|
||||
|
||||
tjsonGetNumberValue(pJson, "vndStats.stables", pCfg->vndStats.numOfSTables, code);
|
||||
if (code < 0) return -1;
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "vnd.h"
|
||||
#include "vnodeInt.h"
|
||||
#include "sync.h"
|
||||
|
||||
extern int32_t tsdbPreCommit(STsdb *pTsdb);
|
||||
extern int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo);
|
||||
|
@ -200,8 +201,10 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
|
|||
// free info binary
|
||||
taosMemoryFree(data);
|
||||
|
||||
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d", pInfo->config.vgId, fname,
|
||||
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex);
|
||||
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d selfIndex:%d changeVersion:%d",
|
||||
pInfo->config.vgId, fname,
|
||||
pInfo->config.syncCfg.replicaNum, pInfo->config.syncCfg.myIndex,
|
||||
pInfo->config.syncCfg.changeVersion);
|
||||
|
||||
return 0;
|
||||
|
||||
|
@ -285,6 +288,8 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
|
|||
|
||||
tsem_wait(&pVnode->canCommit);
|
||||
|
||||
if(syncNodeGetConfig(pVnode->sync, &pVnode->config.syncCfg) != 0) goto _exit;
|
||||
|
||||
pVnode->state.commitTerm = pVnode->state.applyTerm;
|
||||
|
||||
pInfo->info.config = pVnode->config;
|
||||
|
|
|
@ -118,9 +118,10 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
|
|||
if (pReq->learnerSelfIndex != -1) {
|
||||
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
||||
}
|
||||
pCfg->changeVersion = pReq->changeVersion;
|
||||
|
||||
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum,
|
||||
pCfg->totalReplicaNum, pCfg->myIndex);
|
||||
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d changeVersion:%d",
|
||||
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex, pCfg->changeVersion);
|
||||
|
||||
info.config.syncCfg = *pCfg;
|
||||
ret = vnodeSaveInfo(dir, &info);
|
||||
|
@ -217,6 +218,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
|||
info.config.hashEnd = pReq->hashEnd;
|
||||
info.config.hashChange = true;
|
||||
info.config.walCfg.vgId = pReq->dstVgId;
|
||||
info.config.syncCfg.changeVersion = pReq->changeVersion;
|
||||
|
||||
SSyncCfg *pCfg = &info.config.syncCfg;
|
||||
pCfg->myIndex = 0;
|
||||
|
@ -447,7 +449,8 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
|||
}
|
||||
|
||||
// open sync
|
||||
if (vnodeSyncOpen(pVnode, dir)) {
|
||||
vInfo("vgId:%d, start to open sync, changeVersion:%d", TD_VID(pVnode), info.config.syncCfg.changeVersion);
|
||||
if (vnodeSyncOpen(pVnode, dir, info.config.syncCfg.changeVersion)) {
|
||||
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
|
|
@ -384,6 +384,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
|||
pLoad->roleTimeMs = state.roleTimeMs;
|
||||
pLoad->startTimeMs = state.startTimeMs;
|
||||
pLoad->syncCanRead = state.canRead;
|
||||
pLoad->learnerProgress = state.progress;
|
||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||
|
|
|
@ -37,6 +37,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
|
|||
static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
|
||||
int32_t code = 0;
|
||||
|
@ -404,10 +405,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
return -1;
|
||||
}
|
||||
|
||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver);
|
||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64 ", applied:%" PRId64
|
||||
", state.applyTerm:%" PRId64 ", conn.applyTerm:%" PRId64,
|
||||
TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied,
|
||||
pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
|
||||
|
||||
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
||||
ASSERT(pVnode->state.applied + 1 == ver);
|
||||
ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver);
|
||||
|
||||
atomic_store_64(&pVnode->state.applied, ver);
|
||||
atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
|
||||
|
@ -525,6 +529,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
case TDMT_VND_COMPACT:
|
||||
vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
|
||||
goto _exit;
|
||||
case TDMT_SYNC_CONFIG_CHANGE:
|
||||
vnodeProcessConfigChangeReq(pVnode, ver, pReq, len, pRsp);
|
||||
break;
|
||||
default:
|
||||
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
|
||||
return -1;
|
||||
|
@ -1842,6 +1849,17 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR
|
|||
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
syncCheckMember(pVnode->sync);
|
||||
|
||||
pRsp->msgType = TDMT_SYNC_CONFIG_CHANGE_RSP;
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
pRsp->pCont = NULL;
|
||||
pRsp->contLen = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifndef TD_ENTERPRISE
|
||||
int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
return 0;
|
||||
|
|
|
@ -637,7 +637,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
return pFsm;
|
||||
}
|
||||
|
||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path, int32_t vnodeVersion) {
|
||||
SSyncInfo syncInfo = {
|
||||
.snapshotStrategy = SYNC_STRATEGY_WAL_FIRST,
|
||||
.batchSize = 1,
|
||||
|
@ -664,7 +664,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
|||
pNode->nodeId, pNode->clusterId);
|
||||
}
|
||||
|
||||
pVnode->sync = syncOpen(&syncInfo);
|
||||
pVnode->sync = syncOpen(&syncInfo, vnodeVersion);
|
||||
if (pVnode->sync <= 0) {
|
||||
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -40,6 +40,7 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pIndexMgr);
|
|||
void syncIndexMgrClear(SSyncIndexMgr *pIndexMgr);
|
||||
void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncIndex index);
|
||||
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
|
||||
void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr * pOldIndex, SRaftId *oldReplicasId);
|
||||
|
||||
void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime);
|
||||
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
|
||||
|
|
|
@ -228,7 +228,7 @@ typedef struct SSyncNode {
|
|||
} SSyncNode;
|
||||
|
||||
// open/close --------------
|
||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
|
||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion);
|
||||
int32_t syncNodeStart(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||
void syncNodeClose(SSyncNode* pSyncNode);
|
||||
|
@ -238,6 +238,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int
|
|||
int32_t syncNodeRestore(SSyncNode* pSyncNode);
|
||||
void syncHbTimerDataFree(SSyncHbTimerData* pData);
|
||||
|
||||
// config
|
||||
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str);
|
||||
|
||||
// on message ---------------------
|
||||
int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
||||
|
|
|
@ -105,7 +105,7 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf);
|
|||
|
||||
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
||||
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
|
||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str);
|
||||
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex);
|
||||
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode);
|
||||
|
||||
|
@ -115,7 +115,7 @@ int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
|
|||
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
|
||||
|
||||
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
|
||||
int32_t applyCode);
|
||||
int32_t applyCode, bool force);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -46,7 +46,7 @@ void syncEntryDestroy(SSyncRaftEntry* pEntry);
|
|||
void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg); // step 7
|
||||
|
||||
static FORCE_INLINE bool syncLogReplBarrier(SSyncRaftEntry* pEntry) {
|
||||
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
|
||||
return pEntry->originalRpcType == TDMT_SYNC_NOOP || pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -151,8 +151,9 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
}
|
||||
|
||||
sTrace("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
|
||||
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "",
|
||||
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex);
|
||||
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64,
|
||||
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
|
||||
pEntry->term);
|
||||
|
||||
// accept
|
||||
if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
|
||||
|
@ -162,7 +163,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
|
||||
_SEND_RESPONSE:
|
||||
pEntry = NULL;
|
||||
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm);
|
||||
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm, "OnAppn");
|
||||
bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
|
||||
if (accepted && matched) {
|
||||
pReply->success = true;
|
||||
|
|
|
@ -70,6 +70,27 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, Sync
|
|||
DID(pRaftId), CID(pRaftId));
|
||||
}
|
||||
|
||||
void syncIndexMgrCopyIfExist(SSyncIndexMgr * pNewIndex, SSyncIndexMgr * pOldIndex, SRaftId *oldReplicasId){
|
||||
for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){
|
||||
sDebug("old Index j:%d, index:%"PRId64, j, pOldIndex->index[j]);
|
||||
}
|
||||
|
||||
for (int i = 0; i < pNewIndex->totalReplicaNum; ++i) {
|
||||
for(int j = 0; j < pOldIndex->totalReplicaNum; ++j){
|
||||
if (syncUtilSameId(/*(const SRaftId*)*/&((oldReplicasId[j])), &((*(pNewIndex->replicas))[i]))) {
|
||||
pNewIndex->index[i] = pOldIndex->index[j];
|
||||
pNewIndex->privateTerm[i] = pOldIndex->privateTerm[j];
|
||||
pNewIndex->startTimeArr[i] = pOldIndex->startTimeArr[j];
|
||||
pNewIndex->recvTimeArr[i] = pOldIndex->recvTimeArr[j];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < pNewIndex->totalReplicaNum; ++i){
|
||||
sDebug("new index i:%d, index:%"PRId64, i, pNewIndex->index[i]);
|
||||
}
|
||||
}
|
||||
|
||||
SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
|
||||
for (int i = 0; i < pNode->totalReplicaNum; i++) {
|
||||
if (syncUtilSameId(&pNode->replicasId[i], pRaftId)) {
|
||||
|
|
|
@ -59,8 +59,8 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
|||
|
||||
static ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
||||
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo) {
|
||||
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo, vnodeVersion);
|
||||
if (pSyncNode == NULL) {
|
||||
sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
|
||||
return -1;
|
||||
|
@ -106,6 +106,21 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg){
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
|
||||
if (pSyncNode == NULL) {
|
||||
sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid);
|
||||
return -1;
|
||||
}
|
||||
|
||||
*cfg = pSyncNode->raftCfg.cfg;
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void syncStop(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode != NULL) {
|
||||
|
@ -516,6 +531,20 @@ SSyncState syncGetState(int64_t rid) {
|
|||
} else {
|
||||
state.canRead = state.restored;
|
||||
}
|
||||
/*
|
||||
double progress = 0;
|
||||
if(pSyncNode->pLogBuf->totalIndex > 0 && pSyncNode->pLogBuf->commitIndex > 0){
|
||||
progress = (double)pSyncNode->pLogBuf->commitIndex/(double)pSyncNode->pLogBuf->totalIndex;
|
||||
state.progress = (int32_t)(progress * 100);
|
||||
}
|
||||
else{
|
||||
state.progress = -1;
|
||||
}
|
||||
sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
|
||||
"progress:%lf, progress:%d",
|
||||
pSyncNode->vgId,
|
||||
pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
|
||||
*/
|
||||
state.term = raftStoreGetTerm(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
}
|
||||
|
@ -573,6 +602,20 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncCheckMember(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
sError("sync propose error");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncIsCatchUp(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
@ -634,15 +677,26 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
|
|||
}
|
||||
|
||||
// optimized one replica
|
||||
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
|
||||
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
|
||||
SyncIndex retIndex;
|
||||
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
|
||||
if (code == 0) {
|
||||
if (code >= 0) {
|
||||
pMsg->info.conn.applyIndex = retIndex;
|
||||
pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
|
||||
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
return 1;
|
||||
|
||||
//after raft member change, need to handle 1->2 switching point
|
||||
//at this point, need to switch entry handling thread
|
||||
if(pSyncNode->replicaNum == 1){
|
||||
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
return 1;
|
||||
}
|
||||
else{
|
||||
sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64 " type:%s, "
|
||||
"handle:%p", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType), pMsg->info.handle);
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
|
||||
|
@ -742,7 +796,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
|
|||
}
|
||||
|
||||
// open/close --------------
|
||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||
SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode));
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -784,20 +838,27 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
goto _error;
|
||||
}
|
||||
|
||||
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
|
||||
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
|
||||
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
|
||||
if (syncWriteCfgFile(pSyncNode) != 0) {
|
||||
sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
|
||||
goto _error;
|
||||
if(vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion){
|
||||
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
|
||||
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
|
||||
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
|
||||
if (syncWriteCfgFile(pSyncNode) != 0) {
|
||||
sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId);
|
||||
goto _error;
|
||||
}
|
||||
} else {
|
||||
sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
|
||||
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
|
||||
}
|
||||
} else {
|
||||
sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
|
||||
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
|
||||
}
|
||||
else{
|
||||
sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d",
|
||||
pSyncNode->vgId, vnodeVersion, pSyncInfo->syncCfg.changeVersion);
|
||||
}
|
||||
}
|
||||
|
||||
// init by SSyncInfo
|
||||
|
||||
// init by SSyncInfo
|
||||
pSyncNode->vgId = pSyncInfo->vgId;
|
||||
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
|
||||
bool updated = false;
|
||||
|
@ -812,14 +873,16 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
|||
pNode->nodeId, pNode->clusterId);
|
||||
}
|
||||
|
||||
if (updated) {
|
||||
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
|
||||
if (syncWriteCfgFile(pSyncNode) != 0) {
|
||||
sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
|
||||
goto _error;
|
||||
if(vnodeVersion > pSyncInfo->syncCfg.changeVersion){
|
||||
if (updated) {
|
||||
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
|
||||
if (syncWriteCfgFile(pSyncNode) != 0) {
|
||||
sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pSyncNode->pWal = pSyncInfo->pWal;
|
||||
pSyncNode->msgcb = pSyncInfo->msgcb;
|
||||
pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
|
||||
|
@ -2256,6 +2319,540 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
|
|||
return code;
|
||||
}
|
||||
|
||||
void syncBuildConfigFromReq(SAlterVnodeReplicaReq *pReq, SSyncCfg *cfg){//TODO SAlterVnodeReplicaReq name is proper?
|
||||
cfg->replicaNum = 0;
|
||||
cfg->totalReplicaNum = 0;
|
||||
|
||||
for (int i = 0; i < pReq->replica; ++i) {
|
||||
SNodeInfo *pNode = &cfg->nodeInfo[i];
|
||||
pNode->nodeId = pReq->replicas[i].id;
|
||||
pNode->nodePort = pReq->replicas[i].port;
|
||||
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole);
|
||||
cfg->replicaNum++;
|
||||
}
|
||||
if(pReq->selfIndex != -1){
|
||||
cfg->myIndex = pReq->selfIndex;
|
||||
}
|
||||
for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
|
||||
SNodeInfo *pNode = &cfg->nodeInfo[i];
|
||||
pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
|
||||
pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
|
||||
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
|
||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole);
|
||||
cfg->totalReplicaNum++;
|
||||
}
|
||||
cfg->totalReplicaNum += pReq->replica;
|
||||
if(pReq->learnerSelfIndex != -1){
|
||||
cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
||||
}
|
||||
cfg->changeVersion = pReq->changeVersion;
|
||||
}
|
||||
|
||||
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){
|
||||
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMsgHead *head = (SMsgHead *)pEntry->data;
|
||||
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
|
||||
|
||||
SAlterVnodeTypeReq req = {0};
|
||||
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncCfg cfg = {0};
|
||||
syncBuildConfigFromReq(&req, &cfg);
|
||||
|
||||
if(cfg.totalReplicaNum >= 1 && ths->state == TAOS_SYNC_STATE_LEADER){
|
||||
bool incfg = false;
|
||||
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
|
||||
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
|
||||
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
|
||||
incfg = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(!incfg){
|
||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||
syncNodeStepDown(ths, currentTerm);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){
|
||||
sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d, "
|
||||
"restoreFinish:%d",
|
||||
ths->vgId, str,
|
||||
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
|
||||
ths->restoreFinish);
|
||||
|
||||
sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
|
||||
ths->vgId, str, ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn,
|
||||
ths->myNodeInfo.nodePort, ths->myNodeInfo.nodeRole);
|
||||
|
||||
for (int32_t i = 0; i < ths->peersNum; ++i){
|
||||
sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
|
||||
ths->vgId, str, i, ths->peersNodeInfo[i].clusterId,
|
||||
ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
|
||||
ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ths->peersNum; ++i){
|
||||
char buf[256];
|
||||
int32_t len = 256;
|
||||
int32_t n = 0;
|
||||
n += snprintf(buf + n, len - n, "%s", "{");
|
||||
for (int i = 0; i < ths->peersEpset->numOfEps; i++) {
|
||||
n += snprintf(buf + n, len - n, "%s:%d%s", ths->peersEpset->eps[i].fqdn, ths->peersEpset->eps[i].port,
|
||||
(i + 1 < ths->peersEpset->numOfEps ? ", " : ""));
|
||||
}
|
||||
n += snprintf(buf + n, len - n, "%s", "}");
|
||||
|
||||
sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d",
|
||||
ths->vgId, str, i, buf, ths->peersEpset->inUse);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ths->peersNum; ++i){
|
||||
sInfo("vgId:%d, %s, peersId%d, addr:%"PRId64,
|
||||
ths->vgId, str, i, ths->peersId[i].addr);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){
|
||||
sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
|
||||
ths->vgId, str, i, ths->raftCfg.cfg.nodeInfo[i].clusterId,
|
||||
ths->raftCfg.cfg.nodeInfo[i].nodeId, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn,
|
||||
ths->raftCfg.cfg.nodeInfo[i].nodePort, ths->raftCfg.cfg.nodeInfo[i].nodeRole);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){
|
||||
sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64,
|
||||
ths->vgId, str, i, ths->replicasId[i].addr);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
|
||||
int32_t i = 0;
|
||||
|
||||
//change peersNodeInfo
|
||||
i = 0;
|
||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
|
||||
if(!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
||||
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){
|
||||
ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
|
||||
ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
|
||||
tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
|
||||
ths->peersNodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
|
||||
ths->peersNodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
|
||||
|
||||
syncUtilNodeInfo2EpSet(&ths->peersNodeInfo[i], &ths->peersEpset[i]);
|
||||
|
||||
if (!syncUtilNodeInfo2RaftId(&ths->peersNodeInfo[i], ths->vgId, &ths->peersId[i])) {
|
||||
sError("vgId:%d, failed to determine raft member id, peer:%d", ths->vgId, i);
|
||||
return -1;
|
||||
}
|
||||
|
||||
i++;
|
||||
}
|
||||
}
|
||||
ths->peersNum = i;
|
||||
|
||||
//change cfg nodeInfo
|
||||
ths->raftCfg.cfg.replicaNum = 0;
|
||||
i = 0;
|
||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
|
||||
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
||||
ths->raftCfg.cfg.replicaNum++;
|
||||
}
|
||||
ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
|
||||
ths->raftCfg.cfg.nodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
|
||||
tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
|
||||
ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
|
||||
ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
|
||||
if((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
||||
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){
|
||||
ths->raftCfg.cfg.myIndex = i;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
ths->raftCfg.cfg.totalReplicaNum = i;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg *cfg){
|
||||
//change peersNodeInfo
|
||||
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
|
||||
if(strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
||||
&& ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){
|
||||
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
||||
ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//change cfg nodeInfo
|
||||
ths->raftCfg.cfg.replicaNum = 0;
|
||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
|
||||
if(strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
||||
&& ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){
|
||||
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
||||
ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
ths->raftCfg.cfg.replicaNum++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum){
|
||||
//1.rebuild replicasId, remove deleted one
|
||||
SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
||||
memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
|
||||
|
||||
ths->replicaNum = ths->raftCfg.cfg.replicaNum;
|
||||
ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum;
|
||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]);
|
||||
}
|
||||
|
||||
|
||||
//2.rebuild MatchIndex, remove deleted one
|
||||
SSyncIndexMgr *oldIndex = ths->pMatchIndex;
|
||||
|
||||
ths->pMatchIndex = syncIndexMgrCreate(ths);
|
||||
|
||||
syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId);
|
||||
|
||||
syncIndexMgrDestroy(oldIndex);
|
||||
|
||||
|
||||
//3.rebuild NextIndex, remove deleted one
|
||||
SSyncIndexMgr *oldNextIndex = ths->pNextIndex;
|
||||
|
||||
ths->pNextIndex = syncIndexMgrCreate(ths);
|
||||
|
||||
syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId);
|
||||
|
||||
syncIndexMgrDestroy(oldNextIndex);
|
||||
|
||||
|
||||
//4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
|
||||
voteGrantedUpdate(ths->pVotesGranted, ths);
|
||||
votesRespondUpdate(ths->pVotesRespond, ths);
|
||||
|
||||
|
||||
//5.rebuild logReplMgr
|
||||
for(int i = 0; i < oldtotalReplicaNum; ++i){
|
||||
sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId, i,
|
||||
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
|
||||
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
|
||||
}
|
||||
|
||||
SSyncLogReplMgr oldLogReplMgrs[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
|
||||
|
||||
for(int i = 0; i < oldtotalReplicaNum; i++){
|
||||
oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
|
||||
}
|
||||
|
||||
syncNodeLogReplDestroy(ths);
|
||||
syncNodeLogReplInit(ths);
|
||||
|
||||
for(int i = 0; i < ths->totalReplicaNum; ++i){
|
||||
for(int j = 0; j < oldtotalReplicaNum; j++){
|
||||
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
|
||||
*(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
|
||||
ths->logReplMgrs[i]->peerId = i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for(int i = 0; i < ths->totalReplicaNum; ++i){
|
||||
sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")" , ths->vgId, i,
|
||||
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
|
||||
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
|
||||
}
|
||||
|
||||
//6.rebuild sender
|
||||
for(int i = 0; i < oldtotalReplicaNum; ++i){
|
||||
sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64,
|
||||
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
if (ths->senders[i] != NULL) {
|
||||
sDebug("vgId:%d, snapshot sender destroy while close, data:%p", ths->vgId, ths->senders[i]);
|
||||
|
||||
if (snapshotSenderIsStart(ths->senders[i])) {
|
||||
snapshotSenderStop(ths->senders[i], false);
|
||||
}
|
||||
|
||||
snapshotSenderDestroy(ths->senders[i]);
|
||||
ths->senders[i] = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
SSyncSnapshotSender* pSender = snapshotSenderCreate(ths, i);
|
||||
if (pSender == NULL) return -1;
|
||||
|
||||
ths->senders[i] = pSender;
|
||||
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
|
||||
}
|
||||
|
||||
for(int i = 0; i < ths->totalReplicaNum; i++){
|
||||
sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64,
|
||||
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
|
||||
}
|
||||
|
||||
|
||||
//7.rebuild synctimer
|
||||
syncNodeStopHeartbeatTimer(ths);
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||
syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i]);
|
||||
}
|
||||
|
||||
syncNodeStartHeartbeatTimer(ths);
|
||||
|
||||
|
||||
//8.rebuild peerStates
|
||||
SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
|
||||
for(int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++){
|
||||
oldState[i] = ths->peerStates[i];
|
||||
}
|
||||
|
||||
for(int i = 0; i < ths->totalReplicaNum; i++){
|
||||
for(int j = 0; j < oldtotalReplicaNum; j++){
|
||||
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){
|
||||
ths->peerStates[i] = oldState[j];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void syncNodeChangeToVoter(SSyncNode* ths){
|
||||
//replicasId, only need to change replicaNum when 1->3
|
||||
ths->replicaNum = ths->raftCfg.cfg.replicaNum;
|
||||
sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
|
||||
for (int32_t i = 0; i < ths->totalReplicaNum; ++i){
|
||||
sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
|
||||
}
|
||||
|
||||
//pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
|
||||
ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
|
||||
ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
|
||||
|
||||
sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
|
||||
for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i){
|
||||
sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
|
||||
}
|
||||
|
||||
//pVotesGranted, pVotesRespond
|
||||
voteGrantedUpdate(ths->pVotesGranted, ths);
|
||||
votesRespondUpdate(ths->pVotesRespond, ths);
|
||||
|
||||
//logRepMgrs
|
||||
//no need to change logRepMgrs when 1->3
|
||||
}
|
||||
|
||||
void syncNodeResetPeerAndCfg(SSyncNode* ths){
|
||||
SNodeInfo node = {0};
|
||||
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
||||
memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||
memcpy(&ths->raftCfg.cfg.nodeInfo[i], &node, sizeof(SNodeInfo));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
|
||||
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMsgHead *head = (SMsgHead *)pEntry->data;
|
||||
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
|
||||
|
||||
SAlterVnodeTypeReq req = {0};
|
||||
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncCfg cfg = {0};
|
||||
syncBuildConfigFromReq(&req, &cfg);
|
||||
|
||||
if(cfg.changeVersion <= ths->raftCfg.cfg.changeVersion){
|
||||
sInfo("vgId:%d, skip conf change entry since lower version. "
|
||||
"this entry, index:%" PRId64 ", term:%" PRId64 ", totalReplicaNum:%d, changeVersion:%d; "
|
||||
"current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64", changeVersion:%d",
|
||||
ths->vgId,
|
||||
pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion,
|
||||
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if(strcmp(str, "Commit") == 0){
|
||||
sInfo("vgId:%d, change config from %s. "
|
||||
"this, i:%" PRId64 ", trNum:%d, vers:%d; "
|
||||
"node, rNum:%d, pNum:%d, trNum:%d, "
|
||||
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
|
||||
"cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
|
||||
ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion,
|
||||
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
|
||||
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
|
||||
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||
}
|
||||
else{
|
||||
sInfo("vgId:%d, change config from %s. "
|
||||
"this, i:%" PRId64 ", t:%" PRId64 ", trNum:%d, vers:%d; "
|
||||
"node, rNum:%d, pNum:%d, trNum:%d, "
|
||||
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
|
||||
"cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
|
||||
ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion,
|
||||
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
|
||||
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
|
||||
pEntry->index -1, ths->commitIndex, ths->pLogBuf->commitIndex);
|
||||
}
|
||||
|
||||
syncNodeLogConfigInfo(ths, &cfg, "before config change");
|
||||
|
||||
int32_t oldTotalReplicaNum = ths->totalReplicaNum;
|
||||
|
||||
if(cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2){//remove replica
|
||||
|
||||
bool incfg = false;
|
||||
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
|
||||
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
|
||||
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
|
||||
incfg = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(incfg){//remove other
|
||||
syncNodeResetPeerAndCfg(ths);
|
||||
|
||||
//no need to change myNodeInfo
|
||||
|
||||
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){
|
||||
return -1;
|
||||
};
|
||||
|
||||
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
|
||||
return -1;
|
||||
};
|
||||
}
|
||||
else{//remove myself
|
||||
//no need to do anything actually, to change the following to reduce distruptive server chance
|
||||
|
||||
syncNodeResetPeerAndCfg(ths);
|
||||
|
||||
//change myNodeInfo
|
||||
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||
|
||||
//change peer and cfg
|
||||
ths->peersNum = 0;
|
||||
memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
|
||||
ths->raftCfg.cfg.replicaNum = 0;
|
||||
ths->raftCfg.cfg.totalReplicaNum = 1;
|
||||
|
||||
//change other
|
||||
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
|
||||
return -1;
|
||||
}
|
||||
|
||||
//change state
|
||||
ths->state = TAOS_SYNC_STATE_LEARNER;
|
||||
}
|
||||
|
||||
ths->restoreFinish = false;
|
||||
}
|
||||
else{//add replica, or change replica type
|
||||
if(ths->totalReplicaNum == 3){ //change replica type
|
||||
sInfo("vgId:%d, begin change replica type", ths->vgId);
|
||||
|
||||
//change myNodeInfo
|
||||
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
|
||||
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
|
||||
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
|
||||
if(cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
||||
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//change peer and cfg
|
||||
syncNodeChangePeerAndCfgToVoter(ths, &cfg);
|
||||
|
||||
//change other
|
||||
syncNodeChangeToVoter(ths);
|
||||
|
||||
//change state
|
||||
if(ths->state ==TAOS_SYNC_STATE_LEARNER){
|
||||
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER ){
|
||||
ths->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||
}
|
||||
}
|
||||
|
||||
ths->restoreFinish = false;
|
||||
}
|
||||
else{//add replica
|
||||
sInfo("vgId:%d, begin add replica", ths->vgId);
|
||||
|
||||
//no need to change myNodeInfo
|
||||
|
||||
//change peer and cfg
|
||||
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){
|
||||
return -1;
|
||||
};
|
||||
|
||||
//change other
|
||||
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
|
||||
return -1;
|
||||
};
|
||||
|
||||
//no need to change state
|
||||
|
||||
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){
|
||||
ths->restoreFinish = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ths->quorum = syncUtilQuorum(ths->replicaNum);
|
||||
|
||||
ths->raftCfg.lastConfigIndex = pEntry->index;
|
||||
ths->raftCfg.cfg.lastIndex = pEntry->index;
|
||||
ths->raftCfg.cfg.changeVersion = cfg.changeVersion;
|
||||
|
||||
syncNodeLogConfigInfo(ths, &cfg, "after config change");
|
||||
|
||||
if(syncWriteCfgFile(ths) != 0){
|
||||
sError("vgId:%d, failed to create sync cfg file", ths->vgId);
|
||||
return -1;
|
||||
};
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||
if (pEntry->dataLen < sizeof(SMsgHead)) {
|
||||
sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId,
|
||||
|
@ -2268,13 +2865,13 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
|||
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
|
||||
sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index);
|
||||
ASSERT(terrno != 0);
|
||||
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno);
|
||||
(void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false);
|
||||
syncEntryDestroy(pEntry);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
// proceed match index, with replicating on needed
|
||||
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL);
|
||||
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
|
||||
|
||||
sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
|
||||
", %" PRId64 ")",
|
||||
|
@ -2305,6 +2902,9 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
|
|||
int32_t toCount = 0;
|
||||
int64_t tsNow = taosGetTimestampMs();
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
if(pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER){
|
||||
continue;
|
||||
}
|
||||
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
||||
if (recvTime == 0 || recvTime == -1) {
|
||||
continue;
|
||||
|
@ -2566,6 +3166,14 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
|
||||
}
|
||||
|
||||
//1->2, config change is add in write thread, and will continue in sync thread
|
||||
//need save message for it
|
||||
if(pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE){
|
||||
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
|
||||
uint64_t seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
|
||||
pEntry->seqNum = seqNum;
|
||||
}
|
||||
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr());
|
||||
return -1;
|
||||
|
@ -2576,7 +3184,28 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
|||
(*pRetIndex) = index;
|
||||
}
|
||||
|
||||
int32_t code = syncNodeAppend(ths, pEntry);
|
||||
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
|
||||
int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
|
||||
if(code < 0){
|
||||
sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
|
||||
syncEntryDestroy(pEntry);
|
||||
pEntry = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(code > 0){
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
(void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
syncEntryDestroy(pEntry);
|
||||
pEntry = NULL;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
code = syncNodeAppend(ths, pEntry);
|
||||
return code;
|
||||
} else {
|
||||
syncEntryDestroy(pEntry);
|
||||
|
|
|
@ -25,6 +25,8 @@
|
|||
#include "syncRespMgr.h"
|
||||
#include "syncSnapshot.h"
|
||||
#include "syncUtil.h"
|
||||
#include "syncRaftCfg.h"
|
||||
#include "syncVoteMgr.h"
|
||||
|
||||
static bool syncIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
|
@ -428,7 +430,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf
|
|||
return 0;
|
||||
}
|
||||
|
||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm) {
|
||||
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str) {
|
||||
taosThreadMutexLock(&pBuf->mutex);
|
||||
syncLogBufferValidate(pBuf);
|
||||
|
||||
|
@ -475,9 +477,6 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
|||
sTrace("vgId:%d, log buffer proceed. start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64,
|
||||
pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||
|
||||
// replicate on demand
|
||||
(void)syncNodeReplicateWithoutLock(pNode);
|
||||
|
||||
// persist
|
||||
if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
|
||||
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
|
||||
|
@ -485,6 +484,39 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
|
|||
taosMsleep(1);
|
||||
goto _out;
|
||||
}
|
||||
|
||||
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
|
||||
if(pNode->pLogBuf->commitIndex == pEntry->index -1){
|
||||
sInfo("vgId:%d, to change config at %s. "
|
||||
"current entry, index:%" PRId64 ", term:%" PRId64", "
|
||||
"node, restore:%d, commitIndex:%" PRId64 ", "
|
||||
"cond: (pre entry index:%" PRId64 "== buf commit index:%" PRId64 ")",
|
||||
pNode->vgId, str,
|
||||
pEntry->index, pEntry->term,
|
||||
pNode->restoreFinish, pNode->commitIndex,
|
||||
pEntry->index - 1, pNode->pLogBuf->commitIndex);
|
||||
if(syncNodeChangeConfig(pNode, pEntry, str) != 0){
|
||||
sError("vgId:%d, failed to change config from Append since %s. index:%" PRId64, pNode->vgId, terrstr(),
|
||||
pEntry->index);
|
||||
goto _out;
|
||||
}
|
||||
}
|
||||
else{
|
||||
sInfo("vgId:%d, delay change config from Node %s. "
|
||||
"curent entry, index:%" PRId64 ", term:%" PRId64 ", "
|
||||
"node, commitIndex:%" PRId64 ", pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
|
||||
"cond:( pre entry index:%" PRId64" != buf commit index:%" PRId64 ")",
|
||||
pNode->vgId, str,
|
||||
pEntry->index, pEntry->term,
|
||||
pNode->commitIndex, pNode->pLogBuf->startIndex, pNode->pLogBuf->commitIndex,
|
||||
pNode->pLogBuf->matchIndex, pNode->pLogBuf->endIndex,
|
||||
pEntry->index - 1, pNode->pLogBuf->commitIndex);
|
||||
}
|
||||
}
|
||||
|
||||
// replicate on demand
|
||||
(void)syncNodeReplicateWithoutLock(pNode);
|
||||
|
||||
ASSERT(pEntry->index == pBuf->matchIndex);
|
||||
|
||||
// update my match index
|
||||
|
@ -503,8 +535,16 @@ _out:
|
|||
}
|
||||
|
||||
int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
|
||||
int32_t applyCode) {
|
||||
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) {
|
||||
int32_t applyCode, bool force) {
|
||||
//learner need to execute fsm when it catch up entry log
|
||||
//if force is true, keep all contition check to execute fsm
|
||||
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1
|
||||
&& pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER
|
||||
&& force == false) {
|
||||
sDebug("vgId:%d, not to execute fsm, index:%" PRId64 ", term:%" PRId64 ", type:%s code:0x%x, replicaNum:%d,"
|
||||
"role:%d, restoreFinish:%d",
|
||||
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode,
|
||||
pNode->replicaNum, pNode->raftCfg.cfg.nodeInfo[pNode->raftCfg.cfg.myIndex].nodeRole, pNode->restoreFinish);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -559,6 +599,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
bool inBuf = false;
|
||||
SSyncRaftEntry* pNextEntry = NULL;
|
||||
bool nextInBuf = false;
|
||||
|
||||
if (commitIndex <= pBuf->commitIndex) {
|
||||
sDebug("vgId:%d, stale commit index. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
|
||||
|
@ -584,7 +626,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||
}
|
||||
|
||||
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0) != 0) {
|
||||
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false) != 0) {
|
||||
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
|
||||
", role:%d, current term:%" PRId64,
|
||||
vgId, pEntry->index, pEntry->term, role, currentTerm);
|
||||
|
@ -595,10 +637,51 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
|||
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
|
||||
pEntry->index, pEntry->term, role, currentTerm);
|
||||
|
||||
pNextEntry = syncLogBufferGetOneEntry(pBuf, pNode, index + 1, &nextInBuf);
|
||||
if (pNextEntry != NULL) {
|
||||
if(pNextEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
|
||||
sInfo("vgId:%d, to change config at Commit. "
|
||||
"current entry, index:%" PRId64 ", term:%" PRId64", "
|
||||
"node, role:%d, current term:%" PRId64 ", restore:%d, "
|
||||
"cond, next entry index:%" PRId64 ", msgType:%s",
|
||||
vgId,
|
||||
pEntry->index, pEntry->term,
|
||||
role, currentTerm, pNode->restoreFinish,
|
||||
pNextEntry->index, TMSG_INFO(pNextEntry->originalRpcType));
|
||||
|
||||
if(syncNodeChangeConfig(pNode, pNextEntry, "Commit") != 0){
|
||||
sError("vgId:%d, failed to change config from Commit. index:%" PRId64 ", term:%" PRId64
|
||||
", role:%d, current term:%" PRId64,
|
||||
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
|
||||
goto _out;
|
||||
}
|
||||
|
||||
//for 2->1, need to apply config change entry in sync thread,
|
||||
if(pNode->replicaNum == 1){
|
||||
if (syncFsmExecute(pNode, pFsm, role, currentTerm, pNextEntry, 0, true) != 0) {
|
||||
sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64
|
||||
", role:%d, current term:%" PRId64,
|
||||
vgId, pNextEntry->index, pNextEntry->term, role, currentTerm);
|
||||
goto _out;
|
||||
}
|
||||
|
||||
index++;
|
||||
pBuf->commitIndex = index;
|
||||
|
||||
sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId,
|
||||
pNextEntry->index, pNextEntry->term, role, currentTerm);
|
||||
}
|
||||
}
|
||||
if (!nextInBuf) {
|
||||
syncEntryDestroy(pNextEntry);
|
||||
pNextEntry = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (!inBuf) {
|
||||
syncEntryDestroy(pEntry);
|
||||
pEntry = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// recycle
|
||||
|
@ -626,6 +709,10 @@ _out:
|
|||
syncEntryDestroy(pEntry);
|
||||
pEntry = NULL;
|
||||
}
|
||||
if (!nextInBuf) {
|
||||
syncEntryDestroy(pNextEntry);
|
||||
pNextEntry = NULL;
|
||||
}
|
||||
syncLogBufferValidate(pBuf);
|
||||
taosThreadMutexUnlock(&pBuf->mutex);
|
||||
return ret;
|
||||
|
|
|
@ -44,6 +44,7 @@ static int32_t syncEncodeSyncCfg(const void *pObj, SJson *pJson) {
|
|||
SSyncCfg *pCfg = (SSyncCfg *)pObj;
|
||||
if (tjsonAddDoubleToObject(pJson, "replicaNum", pCfg->replicaNum) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(pJson, "myIndex", pCfg->myIndex) < 0) return -1;
|
||||
if (tjsonAddDoubleToObject(pJson, "changeVersion", pCfg->changeVersion) < 0) return -1;
|
||||
|
||||
SJson *nodeInfo = tjsonCreateArray();
|
||||
if (nodeInfo == NULL) return -1;
|
||||
|
@ -113,8 +114,9 @@ int32_t syncWriteCfgFile(SSyncNode *pNode) {
|
|||
if (taosRenameFile(file, realfile) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64, pNode->vgId,
|
||||
realfile, len, pNode->raftCfg.lastConfigIndex);
|
||||
sInfo("vgId:%d, succeed to write sync cfg file:%s, len:%d, lastConfigIndex:%" PRId64 ", "
|
||||
"changeVersion:%d", pNode->vgId,
|
||||
realfile, len, pNode->raftCfg.lastConfigIndex, pNode->raftCfg.cfg.changeVersion);
|
||||
|
||||
_OVER:
|
||||
if (pJson != NULL) tjsonDelete(pJson);
|
||||
|
@ -136,6 +138,8 @@ static int32_t syncDecodeSyncCfg(const SJson *pJson, void *pObj) {
|
|||
if (code < 0) return -1;
|
||||
tjsonGetInt32ValueFromDouble(pJson, "myIndex", pCfg->myIndex, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetInt32ValueFromDouble(pJson, "changeVersion", pCfg->changeVersion, code);
|
||||
if (code < 0) return -1;
|
||||
|
||||
SJson *nodeInfo = tjsonGetObjectItem(pJson, "nodeInfo");
|
||||
if (nodeInfo == NULL) return -1;
|
||||
|
@ -242,7 +246,8 @@ int32_t syncReadCfgFile(SSyncNode *pNode) {
|
|||
}
|
||||
|
||||
code = 0;
|
||||
sInfo("vgId:%d, succceed to read sync cfg file %s", pNode->vgId, file);
|
||||
sInfo("vgId:%d, succceed to read sync cfg file %s, changeVersion:%d",
|
||||
pNode->vgId, file, pCfg->cfg.changeVersion);
|
||||
|
||||
_OVER:
|
||||
if (pData != NULL) taosMemoryFree(pData);
|
||||
|
|
Loading…
Reference in New Issue