diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3409071932..0409decaf8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1191,7 +1191,6 @@ typedef struct { int8_t strict; int8_t cacheLast; int8_t isTsma; - int8_t standby; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; @@ -1206,6 +1205,7 @@ typedef struct { int16_t hashPrefix; int16_t hashSuffix; int32_t tsdbPageSize; + int64_t reserved[8]; } SCreateVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); @@ -1217,6 +1217,7 @@ typedef struct { int32_t dnodeId; int64_t dbUid; char db[TSDB_DB_FNAME_LEN]; + int64_t reserved[8]; } SDropVnodeReq; int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq); @@ -1225,6 +1226,7 @@ int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq typedef struct { int64_t dbUid; char db[TSDB_DB_FNAME_LEN]; + int64_t reserved[8]; } SCompactVnodeReq; int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); @@ -1244,13 +1246,23 @@ typedef struct { int8_t walLevel; int8_t strict; int8_t cacheLast; + int64_t reserved[8]; +} SAlterVnodeConfigReq; + +int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq); +int32_t tDeserializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq); + +typedef struct { + int32_t vgId; + int8_t strict; int8_t selfIndex; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; -} SAlterVnodeReq; + int64_t reserved[8]; +} SAlterVnodeReplicaReq; -int32_t tSerializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq); -int32_t tDeserializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq); +int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq); +int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq); typedef struct { SMsgHead header; @@ -1503,14 +1515,6 @@ typedef struct { int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq); -typedef struct { - int32_t dnodeId; - int8_t standby; -} SSetStandbyReq; - -int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq); -int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq); - typedef struct { char queryStrId[TSDB_QUERY_ID_LEN]; } SKillQueryReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 7b4e930485..1cd02e2a28 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -262,8 +262,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) // no longer used + TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 65541ca9cc..81cdd0e5e0 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3766,7 +3766,6 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI8(&encoder, pReq->compression) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1; - if (tEncodeI8(&encoder, pReq->standby) < 0) return -1; if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { @@ -3795,6 +3794,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1; if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1; if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; + } tEndEncode(&encoder); @@ -3832,7 +3834,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { @@ -3871,6 +3872,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1; if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1; if (tDecodeI32(&decoder, &pReq->tsdbPageSize) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -3892,6 +3896,9 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; + } tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3908,6 +3915,9 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -3921,6 +3931,9 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq * if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; + } tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -3935,13 +3948,16 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); return 0; } -int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) { +int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -3959,11 +3975,8 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1; if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1; - if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; - if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; - for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { - SReplica *pReplica = &pReq->replicas[i]; - if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; } tEndEncode(&encoder); @@ -3972,7 +3985,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq return tlen; } -int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) { +int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -3990,12 +4003,54 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1; if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1; + for (int32_t i = 0; i < 8; ++i) { + if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeReplicaReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; + if (tEncodeI8(&encoder, pReq->strict) < 0) return -1; + if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; + if (tEncodeI8(&encoder, pReq->replica) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { + SReplica *pReplica = &pReq->replicas[i]; + if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; + } + for (int32_t i = 0; i < 8; ++i) { + if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeReplicaReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1; if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { SReplica *pReplica = &pReq->replicas[i]; if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; } + for (int32_t i = 0; i < 8; ++i) { + if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4218,33 +4273,6 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq return 0; } -int32_t tSerializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - - if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; - if (tEncodeI8(&encoder, pReq->standby) < 0) return -1; - tEndEncode(&encoder); - - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; -} - -int32_t tDeserializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - - if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; - if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1; - tEndDecode(&decoder); - - tDecoderClear(&decoder); - return 0; -} - int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 7fb700e776..4c1b307b90 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -196,10 +196,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 1bd68e6d41..bf1ccc1a7b 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -88,6 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); SArray *vmGetMsgHandles(); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c92b6c4c4d..a121ff656e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -265,6 +265,45 @@ _OVER: return code; } +int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SAlterVnodeReplicaReq alterReq = {0}; + if (tSerializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int32_t vgId = alterReq.vgId; + dInfo("vgId:%d, start to alter vnode replica", alterReq.vgId); + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); + if (pVnode == NULL) { + dError("vgId:%d, failed to alter replica since %s", vgId, terrstr()); + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + return -1; + } + + dInfo("vgId:%d, start to close vnode", vgId); + vmCloseVnode(pMgmt, pVnode); + + char path[TSDB_FILENAME_LEN] = {0}; + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId); + + dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path); + if (vnodeAlter(path, &alterReq, pMgmt->pTfs) < 0) { + dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr()); + return -1; + } + + dInfo("vgId:%d, start to open vnode", vgId); + if (vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb) < 0) { + dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr()); + return -1; + } + + dInfo("vgId:%d, vnode config is altered", vgId); + return 0; +} + int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SDropVnodeReq dropReq = {0}; if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { @@ -348,7 +387,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -370,7 +409,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index fd1dbe00ce..8cf89b7f35 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -40,6 +40,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_DND_DROP_VNODE: code = vmProcessDropVnodeReq(pMgmt, pMsg); break; + case TDMT_VND_ALTER_REPLICA: + code = vmProcessAlterVnodeReq(pMgmt, pMsg); + break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; dGError("msg:%p, not processed in vnode-mgmt queue", pMsg); diff --git a/source/dnode/mgmt/test/vnode/vnode.cpp b/source/dnode/mgmt/test/vnode/vnode.cpp index 520d844dbd..8a8e332289 100644 --- a/source/dnode/mgmt/test/vnode/vnode.cpp +++ b/source/dnode/mgmt/test/vnode/vnode.cpp @@ -70,7 +70,7 @@ TEST_F(DndTestVnode, 01_Create_Vnode) { TEST_F(DndTestVnode, 02_Alter_Vnode) { for (int i = 0; i < 3; ++i) { - SAlterVnodeReq alterReq = {0}; + SAlterVnodeConfigReq alterReq = {0}; alterReq.vgVersion = 2; alterReq.daysPerFile = 10; alterReq.daysToKeep0 = 3650; diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index ad5fbef34c..76c3519808 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -38,7 +38,7 @@ int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups); int32_t mndAddVnodeToVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray); int32_t mndRemoveVnodeFromVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid); -int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby); +int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid); int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType); int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo); @@ -47,9 +47,8 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnode int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup, SArray *pArray); -void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby); +void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); -void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 2660fa3146..ebf3382b1b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -447,7 +447,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { SVnodeGid *pVgid = pVgroup->vnodeGid + vn; - if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false) != 0) { + if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid) != 0) { return -1; } } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index a6177fc69f..ccfd7e4a2d 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -534,10 +534,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { - code = syncSetStandby(pMgmt->sync); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); code = -1; @@ -575,10 +571,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { - code = syncSetStandby(pMgmt->sync); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); } else { mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); code = -1; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 1ab0ba8a16..5889a162f8 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -56,8 +56,6 @@ int32_t mndInitMnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index da548cd3eb..56e725fac7 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -454,7 +454,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, pVgroup->pTsma = pSmaReq; int32_t contLen = 0; - void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, false); + void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); taosMemoryFreeClear(pSmaReq); if (pReq == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 47245fa2c4..91673a7ae9 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -196,8 +196,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } -void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, - bool standby) { +void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { SCreateVnodeReq createReq = {0}; createReq.vgId = pVgroup->vgId; memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -227,7 +226,6 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.hashMethod = pDb->cfg.hashMethod; createReq.numOfRetensions = pDb->cfg.numOfRetensions; createReq.pRetensions = pDb->cfg.pRetensions; - createReq.standby = standby; createReq.isTsma = pVgroup->isTsma; createReq.pTsma = pVgroup->pTsma; createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod; @@ -279,8 +277,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg return pReq; } -void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { - SAlterVnodeReq alterReq = {0}; +static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { + SAlterVnodeConfigReq alterReq = {0}; alterReq.vgVersion = pVgroup->version; alterReq.buffer = pDb->cfg.buffer; alterReq.pageSize = pDb->cfg.pageSize; @@ -294,6 +292,34 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_ alterReq.walLevel = pDb->cfg.walLevel; alterReq.strict = pDb->cfg.strict; alterReq.cacheLast = pDb->cfg.cacheLast; + + int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + contLen += sizeof(SMsgHead); + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + SMsgHead *pHead = pReq; + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq); + *pContLen = contLen; + return pReq; +} + +static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, + int32_t *pContLen) { + SAlterVnodeReplicaReq alterReq = {0}; + alterReq.vgId = pVgroup->vgId; + alterReq.strict = pDb->cfg.strict; alterReq.replica = pVgroup->replica; for (int32_t v = 0; v < pVgroup->replica; ++v) { @@ -308,14 +334,22 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_ pReplica->port = pVgidDnode->port; memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); mndReleaseDnode(pMnode, pVgidDnode); + + if (pDnode->id == pVgid->dnodeId) { + alterReq.selfIndex = v; + } } - int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq); + if (alterReq.selfIndex == -1) { + terrno = TSDB_CODE_MND_APP_ERROR; + return NULL; + } + + int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq); if (contLen < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - contLen += sizeof(SMsgHead); void *pReq = taosMemoryMalloc(contLen); if (pReq == NULL) { @@ -323,38 +357,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_ return NULL; } - SMsgHead *pHead = pReq; - pHead->contLen = htonl(contLen); - pHead->vgId = htonl(pVgroup->vgId); - - tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq); - *pContLen = contLen; - return pReq; -} - -void *mndBuildSetVnodeStandbyReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) { - SSetStandbyReq standbyReq = {0}; - standbyReq.dnodeId = pDnode->id; - standbyReq.standby = 1; - - int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq); - if (contLen < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - contLen += sizeof(SMsgHead); - void *pReq = taosMemoryMalloc(contLen); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - tSerializeSSetStandbyReq((char *)pReq + sizeof(SMsgHead), contLen, &standbyReq); - SMsgHead *pHead = pReq; - pHead->contLen = htonl(contLen); - pHead->vgId = htonl(pVgroup->vgId); - + tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq); *pContLen = contLen; return pReq; } @@ -948,8 +951,7 @@ _OVER: return 0; } -int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, - bool standby) { +int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) { STransAction action = {0}; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); @@ -958,7 +960,7 @@ int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg mndReleaseDnode(pMnode, pDnode); int32_t contLen = 0; - void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby); + void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; @@ -1007,7 +1009,7 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO action.epSet = mndGetVgroupEpset(pMnode, pVgroup); int32_t contLen = 0; - void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen); + void *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen); if (pReq == NULL) return -1; action.pCont = pReq; @@ -1022,41 +1024,6 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO return 0; } -static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, - SVnodeGid *pVgid, bool isRedo) { - STransAction action = {0}; - - SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); - if (pDnode == NULL) return -1; - action.epSet = mndGetDnodeEpset(pDnode); - mndReleaseDnode(pMnode, pDnode); - - int32_t contLen = 0; - void *pReq = mndBuildSetVnodeStandbyReq(pMnode, pDnode, pDb, pVgroup, &contLen); - if (pReq == NULL) return -1; - - action.pCont = pReq; - action.contLen = contLen; - action.msgType = TDMT_SYNC_SET_VNODE_STANDBY; - action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED; - // Keep retrying until the target vnode is not the leader - action.retryCode = TSDB_CODE_SYN_IS_LEADER; - - if (isRedo) { - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } else { - if (mndTransAppendUndoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - } - - return 0; -} - int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo) { STransAction action = {0}; @@ -1102,7 +1069,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId); if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1; - if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1], true) != 0) return -1; + if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; @@ -1111,7 +1078,6 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVnodeGid del = newVg.vnodeGid[vnIndex]; newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica]; memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid)); - if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del, true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; @@ -1185,7 +1151,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb pGid->dnodeId = newDnodeId; pGid->syncState = TAOS_SYNC_STATE_ERROR; - if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid, true) != 0) return -1; + if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1; @@ -1212,7 +1178,6 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid)); memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid)); - if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1; @@ -1580,36 +1545,32 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb mndTransSetSerial(pTrans); - if (newVgroup.replica < pNewDb->cfg.replications) { - mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId, + if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) { + mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId); if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1; - if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1; - if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; - if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1; - if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1; + if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1; + if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1; if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; - } else if (newVgroup.replica > pNewDb->cfg.replications) { - mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId); + } + + if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) { + mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName, + pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId); SVnodeGid del1 = {0}; - if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1; - if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del1, true) != 0) return -1; - if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; - if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1; - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; - SVnodeGid del2 = {0}; + if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1; if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1; - if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del2, true) != 0) return -1; - if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; + if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1; if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1; + if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1; } else { + return -1; } { @@ -1660,13 +1621,12 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj if (newVg1.replica == 1) { if (mndAddVnodeToVgroup(pMnode, &newVg1, pArray) != 0) goto _OVER; - if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1], true) != 0) goto _OVER; + if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; } else if (newVg1.replica == 3) { SVnodeGid del1 = {0}; if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER; - if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) goto _OVER; if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER; if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 96ba9a0602..be0d6fdc4d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -50,6 +50,7 @@ extern const SVnodeCfg vnodeCfgDefault; int32_t vnodeInit(int32_t nthreads); void vnodeCleanup(); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); +int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodePreClose(SVnode *pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 3dbd93bb27..57fdb77533 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -15,11 +15,9 @@ #include "vnd.h" -int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { +int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { SVnodeInfo info = {0}; - char dir[TSDB_FILENAME_LEN]; - - // TODO: check if directory exists + char dir[TSDB_FILENAME_LEN] = {0}; // check config if (vnodeCheckCfg(pCfg) < 0) { @@ -56,18 +54,60 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { } vInfo("vgId:%d, vnode is created", info.config.vgId); - return 0; } -void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); } +int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { + SVnodeInfo info = {0}; + char dir[TSDB_FILENAME_LEN] = {0}; + int32_t ret = 0; + + if (pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", path); + } + + ret = vnodeLoadInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno)); + return -1; + } + + SSyncCfg *pCfg = &info.config.syncCfg; + pCfg->myIndex = pReq->selfIndex; + pCfg->replicaNum = pReq->replica; + memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo)); + + vInfo("vgId:%d, save config, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex); + for (int i = 0; i < pReq->replica; ++i) { + SNodeInfo *pNode = &pCfg->nodeInfo[i]; + pNode->nodePort = pReq->replicas[i].port; + tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); + vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort); + } + + ret = vnodeSaveInfo(dir, &info); + if (ret < 0) { + vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno)); + return -1; + } + + vInfo("vgId:%d, vnode config is saved", info.config.vgId); + return 0; +} + +void vnodeDestroy(const char *path, STfs *pTfs) { + vInfo("path:%s is removed while destroy vnode", path); + tfsRmdir(pTfs, path); +} SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { SVnode *pVnode = NULL; SVnodeInfo info = {0}; - char dir[TSDB_FILENAME_LEN]; - char tdir[TSDB_FILENAME_LEN * 2]; - int ret; + char dir[TSDB_FILENAME_LEN] = {0}; + char tdir[TSDB_FILENAME_LEN * 2] = {0}; + int32_t ret = 0; if (pTfs) { snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path); @@ -85,7 +125,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { } // create handle - pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1); + pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1); if (pVnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno)); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index be6eaccb3b..d3d3a7ed95 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1068,20 +1068,20 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo } static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - SAlterVnodeReq req = {0}; - bool walChanged = false; - bool tsdbChanged = false; + bool walChanged = false; + bool tsdbChanged = false; - if (tDeserializeSAlterVnodeReq(pReq, len, &req) != 0) { + SAlterVnodeConfigReq req = {0}; + if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) { terrno = TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG; } vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64 - " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d strict:%d", + " cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d", TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, - req.walFsyncPeriod, req.walLevel, req.strict); + req.walFsyncPeriod, req.walLevel); if (pVnode->config.cacheLastSize != req.cacheLastSize) { pVnode->config.cacheLastSize = req.cacheLastSize; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6bcf603f7f..f59e28daaf 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -20,7 +20,7 @@ static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || - (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA); + (type == TDMT_VND_UPDATE_TAG_VAL); } static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } @@ -53,76 +53,6 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { } } -static int32_t vnodeSetStandBy(SVnode *pVnode) { - vInfo("vgId:%d, start to set standby", TD_VID(pVnode)); - - if (syncSetStandby(pVnode->sync) == 0) { - vInfo("vgId:%d, set standby success", TD_VID(pVnode)); - return 0; - } else if (terrno != TSDB_CODE_SYN_IS_LEADER) { - vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr()); - return -1; - } - - vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode)); - if (syncLeaderTransfer(pVnode->sync) != 0) { - vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr()); - return -1; - } else { - vInfo("vgId:%d, transfer leader success", TD_VID(pVnode)); - } - - if (syncSetStandby(pVnode->sync) == 0) { - vInfo("vgId:%d, set standby success", TD_VID(pVnode)); - return 0; - } else { - vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr()); - return -1; - } -} - -static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { - SAlterVnodeReq req = {0}; - if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return TSDB_CODE_INVALID_MSG; - } - - const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle); - - SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex}; - for (int32_t r = 0; r < req.replica; ++r) { - SNodeInfo *pNode = &cfg.nodeInfo[r]; - tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn)); - pNode->nodePort = req.replicas[r].port; - vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort); - } - - SRpcMsg rpcMsg = {.info = pMsg->info}; - if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) { - vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr()); - return -1; - } - - int32_t code = syncPropose(pVnode->sync, &rpcMsg, false); - if (code != 0) { - if (terrno != 0) code = terrno; - - vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr()); - if (terrno == TSDB_CODE_SYN_IS_LEADER) { - if (syncLeaderTransfer(pVnode->sync) != 0) { - vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr()); - } else { - vInfo("vgId:%d, transfer leader success", TD_VID(pVnode)); - } - } - } - - terrno = code; - return code; -} - void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) { SEpSet newEpSet = {0}; syncGetRetryEpSet(pVnode->sync, &newEpSet); @@ -169,24 +99,6 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) } } -static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { - int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg); - - if (code > 0) { - ASSERT(0); - } else if (code == 0) { - vnodeWaitBlockMsg(pVnode, pMsg); - } else { - if (terrno != 0) code = terrno; - vnodeHandleProposeError(pVnode, pMsg, code); - } - - const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { if (*arrSize <= 0) return; @@ -265,11 +177,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) continue; } - if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { - vnodeHandleAlterReplicaReq(pVnode, pMsg); - continue; - } - if (isBlock || BATCH_DISABLE) { vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos); } @@ -431,11 +338,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ASSERT(pSyncMsg != NULL); code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - code = vnodeSetStandBy(pVnode); - if (code != 0 && terrno != 0) code = terrno; - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); } else { vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); code = -1; @@ -496,11 +398,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); - } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { - code = vnodeSetStandBy(pVnode); - if (code != 0 && terrno != 0) code = terrno; - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - tmsgSendRsp(&rsp); } else { vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); code = -1; @@ -543,22 +440,7 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } -static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) { - SVnode *pVnode = pFsm->data; - - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - syncGetAndDelRespRpc(pVnode->sync, cbMeta->newCfgSeqNum, &rpcMsg.info); - rpcMsg.info.conn.applyIndex = cbMeta->index; - - const STraceId *trace = (STraceId *)&pMsg->info.traceId; - vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), - TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta->seqNum, rpcMsg.info.handle); - if (rpcMsg.info.handle != NULL) { - tmsgSendRsp(&rpcMsg); - } - - vnodePostBlockMsg(pVnode, pMsg); -} +static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { if (cbMeta.isWeak == 0) {