Merge pull request #17531 from taosdata/enh/TD-19461
refact: alter db replica
This commit is contained in:
commit
e6e7befaa8
|
@ -1191,7 +1191,6 @@ typedef struct {
|
||||||
int8_t strict;
|
int8_t strict;
|
||||||
int8_t cacheLast;
|
int8_t cacheLast;
|
||||||
int8_t isTsma;
|
int8_t isTsma;
|
||||||
int8_t standby;
|
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
int8_t selfIndex;
|
int8_t selfIndex;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
|
@ -1206,6 +1205,7 @@ typedef struct {
|
||||||
int16_t hashPrefix;
|
int16_t hashPrefix;
|
||||||
int16_t hashSuffix;
|
int16_t hashSuffix;
|
||||||
int32_t tsdbPageSize;
|
int32_t tsdbPageSize;
|
||||||
|
int64_t reserved[8];
|
||||||
} SCreateVnodeReq;
|
} SCreateVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||||
|
@ -1217,6 +1217,7 @@ typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
int64_t reserved[8];
|
||||||
} SDropVnodeReq;
|
} SDropVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
int32_t tSerializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq);
|
||||||
|
@ -1225,6 +1226,7 @@ int32_t tDeserializeSDropVnodeReq(void* buf, int32_t bufLen, SDropVnodeReq* pReq
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
int64_t reserved[8];
|
||||||
} SCompactVnodeReq;
|
} SCompactVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq);
|
||||||
|
@ -1244,13 +1246,23 @@ typedef struct {
|
||||||
int8_t walLevel;
|
int8_t walLevel;
|
||||||
int8_t strict;
|
int8_t strict;
|
||||||
int8_t cacheLast;
|
int8_t cacheLast;
|
||||||
|
int64_t reserved[8];
|
||||||
|
} SAlterVnodeConfigReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
|
||||||
|
int32_t tDeserializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgId;
|
||||||
|
int8_t strict;
|
||||||
int8_t selfIndex;
|
int8_t selfIndex;
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
} SAlterVnodeReq;
|
int64_t reserved[8];
|
||||||
|
} SAlterVnodeReplicaReq;
|
||||||
|
|
||||||
int32_t tSerializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq);
|
int32_t tSerializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
|
||||||
int32_t tDeserializeSAlterVnodeReq(void* buf, int32_t bufLen, SAlterVnodeReq* pReq);
|
int32_t tDeserializeSAlterVnodeReplicaReq(void* buf, int32_t bufLen, SAlterVnodeReplicaReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead header;
|
SMsgHead header;
|
||||||
|
@ -1503,14 +1515,6 @@ typedef struct {
|
||||||
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||||||
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t dnodeId;
|
|
||||||
int8_t standby;
|
|
||||||
} SSetStandbyReq;
|
|
||||||
|
|
||||||
int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
|
|
||||||
int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char queryStrId[TSDB_QUERY_ID_LEN];
|
char queryStrId[TSDB_QUERY_ID_LEN];
|
||||||
} SKillQueryReq;
|
} SKillQueryReq;
|
||||||
|
|
|
@ -262,8 +262,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) // no longer used
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
||||||
|
|
|
@ -3766,7 +3766,6 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
||||||
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
|
|
||||||
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
||||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
@ -3795,6 +3794,9 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
||||||
if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1;
|
if (tEncodeI16(&encoder, pReq->hashPrefix) < 0) return -1;
|
||||||
if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1;
|
if (tEncodeI16(&encoder, pReq->hashSuffix) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -3832,7 +3834,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
||||||
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
|
|
||||||
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
||||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
@ -3871,6 +3872,9 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
||||||
if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1;
|
if (tDecodeI16(&decoder, &pReq->hashPrefix) < 0) return -1;
|
||||||
if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1;
|
if (tDecodeI16(&decoder, &pReq->hashSuffix) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->tsdbPageSize) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->tsdbPageSize) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -3892,6 +3896,9 @@ int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq)
|
||||||
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -3908,6 +3915,9 @@ int32_t tDeserializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq
|
||||||
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -3921,6 +3931,9 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq *
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -3935,13 +3948,16 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
|
int32_t tSerializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
@ -3959,11 +3975,8 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
|
||||||
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->cacheLast) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
|
||||||
SReplica *pReplica = &pReq->replicas[i];
|
|
||||||
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
|
||||||
}
|
}
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -3972,7 +3985,7 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq) {
|
int32_t tDeserializeSAlterVnodeConfigReq(void *buf, int32_t bufLen, SAlterVnodeConfigReq *pReq) {
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
@ -3990,12 +4003,54 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
|
||||||
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->cacheLast) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeReplicaReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
SReplica *pReplica = &pReq->replicas[i];
|
||||||
|
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tEncodeI64(&encoder, pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnodeReplicaReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
|
||||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
SReplica *pReplica = &pReq->replicas[i];
|
SReplica *pReplica = &pReq->replicas[i];
|
||||||
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
for (int32_t i = 0; i < 8; ++i) {
|
||||||
|
if (tDecodeI64(&decoder, &pReq->reserved[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
@ -4218,33 +4273,6 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
|
|
||||||
SEncoder encoder = {0};
|
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
|
||||||
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
|
|
||||||
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
|
|
||||||
tEndEncode(&encoder);
|
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDeserializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
|
|
||||||
SDecoder decoder = {0};
|
|
||||||
tDecoderInit(&decoder, buf, bufLen);
|
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
|
||||||
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
|
|
||||||
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
|
|
||||||
tEndDecode(&decoder);
|
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
|
@ -196,10 +196,6 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY, mmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
|
|
@ -88,6 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
SArray *vmGetMsgHandles();
|
SArray *vmGetMsgHandles();
|
||||||
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// vmFile.c
|
// vmFile.c
|
||||||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
||||||
|
|
|
@ -265,6 +265,45 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
SAlterVnodeReplicaReq alterReq = {0};
|
||||||
|
if (tSerializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vgId = alterReq.vgId;
|
||||||
|
dInfo("vgId:%d, start to alter vnode replica", alterReq.vgId);
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
dError("vgId:%d, failed to alter replica since %s", vgId, terrstr());
|
||||||
|
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("vgId:%d, start to close vnode", vgId);
|
||||||
|
vmCloseVnode(pMgmt, pVnode);
|
||||||
|
|
||||||
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
|
||||||
|
|
||||||
|
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
|
||||||
|
if (vnodeAlter(path, &alterReq, pMgmt->pTfs) < 0) {
|
||||||
|
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("vgId:%d, start to open vnode", vgId);
|
||||||
|
if (vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb) < 0) {
|
||||||
|
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("vgId:%d, vnode config is altered", vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SDropVnodeReq dropReq = {0};
|
SDropVnodeReq dropReq = {0};
|
||||||
if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
|
||||||
|
@ -348,7 +387,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
@ -370,7 +409,6 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
case TDMT_DND_DROP_VNODE:
|
case TDMT_DND_DROP_VNODE:
|
||||||
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
||||||
break;
|
break;
|
||||||
|
case TDMT_VND_ALTER_REPLICA:
|
||||||
|
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
||||||
|
|
|
@ -70,7 +70,7 @@ TEST_F(DndTestVnode, 01_Create_Vnode) {
|
||||||
|
|
||||||
TEST_F(DndTestVnode, 02_Alter_Vnode) {
|
TEST_F(DndTestVnode, 02_Alter_Vnode) {
|
||||||
for (int i = 0; i < 3; ++i) {
|
for (int i = 0; i < 3; ++i) {
|
||||||
SAlterVnodeReq alterReq = {0};
|
SAlterVnodeConfigReq alterReq = {0};
|
||||||
alterReq.vgVersion = 2;
|
alterReq.vgVersion = 2;
|
||||||
alterReq.daysPerFile = 10;
|
alterReq.daysPerFile = 10;
|
||||||
alterReq.daysToKeep0 = 3650;
|
alterReq.daysToKeep0 = 3650;
|
||||||
|
|
|
@ -38,7 +38,7 @@ int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
|
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
|
||||||
int32_t mndAddVnodeToVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray);
|
int32_t mndAddVnodeToVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray);
|
||||||
int32_t mndRemoveVnodeFromVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid);
|
int32_t mndRemoveVnodeFromVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid);
|
||||||
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby);
|
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
|
||||||
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
|
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
|
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
|
||||||
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
|
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
|
||||||
|
@ -47,9 +47,8 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnode
|
||||||
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||||
SArray *pArray);
|
SArray *pArray);
|
||||||
|
|
||||||
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
|
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
|
||||||
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
|
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -447,7 +447,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
|
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false) != 0) {
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -534,10 +534,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
|
||||||
syncSnapshotRspDestroy(pSyncMsg);
|
syncSnapshotRspDestroy(pSyncMsg);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
|
|
||||||
code = syncSetStandby(pMgmt->sync);
|
|
||||||
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -575,10 +571,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
|
|
||||||
code = syncSetStandby(pMgmt->sync);
|
|
||||||
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
|
@ -56,8 +56,6 @@ int32_t mndInitMnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mndTransProcessRsp);
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mndTransProcessRsp);
|
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
||||||
|
|
|
@ -454,7 +454,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
|
||||||
pVgroup->pTsma = pSmaReq;
|
pVgroup->pTsma = pSmaReq;
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, false);
|
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
||||||
taosMemoryFreeClear(pSmaReq);
|
taosMemoryFreeClear(pSmaReq);
|
||||||
if (pReq == NULL) return -1;
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
|
|
|
@ -196,8 +196,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen,
|
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
||||||
bool standby) {
|
|
||||||
SCreateVnodeReq createReq = {0};
|
SCreateVnodeReq createReq = {0};
|
||||||
createReq.vgId = pVgroup->vgId;
|
createReq.vgId = pVgroup->vgId;
|
||||||
memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -227,7 +226,6 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
||||||
createReq.hashMethod = pDb->cfg.hashMethod;
|
createReq.hashMethod = pDb->cfg.hashMethod;
|
||||||
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
|
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
|
||||||
createReq.pRetensions = pDb->cfg.pRetensions;
|
createReq.pRetensions = pDb->cfg.pRetensions;
|
||||||
createReq.standby = standby;
|
|
||||||
createReq.isTsma = pVgroup->isTsma;
|
createReq.isTsma = pVgroup->isTsma;
|
||||||
createReq.pTsma = pVgroup->pTsma;
|
createReq.pTsma = pVgroup->pTsma;
|
||||||
createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
|
createReq.walRetentionPeriod = pDb->cfg.walRetentionPeriod;
|
||||||
|
@ -279,8 +277,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
static void *mndBuildAlterVnodeConfigReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
||||||
SAlterVnodeReq alterReq = {0};
|
SAlterVnodeConfigReq alterReq = {0};
|
||||||
alterReq.vgVersion = pVgroup->version;
|
alterReq.vgVersion = pVgroup->version;
|
||||||
alterReq.buffer = pDb->cfg.buffer;
|
alterReq.buffer = pDb->cfg.buffer;
|
||||||
alterReq.pageSize = pDb->cfg.pageSize;
|
alterReq.pageSize = pDb->cfg.pageSize;
|
||||||
|
@ -294,6 +292,34 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
|
||||||
alterReq.walLevel = pDb->cfg.walLevel;
|
alterReq.walLevel = pDb->cfg.walLevel;
|
||||||
alterReq.strict = pDb->cfg.strict;
|
alterReq.strict = pDb->cfg.strict;
|
||||||
alterReq.cacheLast = pDb->cfg.cacheLast;
|
alterReq.cacheLast = pDb->cfg.cacheLast;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSAlterVnodeConfigReq(NULL, 0, &alterReq);
|
||||||
|
if (contLen < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
contLen += sizeof(SMsgHead);
|
||||||
|
|
||||||
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMsgHead *pHead = pReq;
|
||||||
|
pHead->contLen = htonl(contLen);
|
||||||
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
|
|
||||||
|
tSerializeSAlterVnodeConfigReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
|
||||||
|
*pContLen = contLen;
|
||||||
|
return pReq;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup,
|
||||||
|
int32_t *pContLen) {
|
||||||
|
SAlterVnodeReplicaReq alterReq = {0};
|
||||||
|
alterReq.vgId = pVgroup->vgId;
|
||||||
|
alterReq.strict = pDb->cfg.strict;
|
||||||
alterReq.replica = pVgroup->replica;
|
alterReq.replica = pVgroup->replica;
|
||||||
|
|
||||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||||
|
@ -308,14 +334,22 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
|
||||||
pReplica->port = pVgidDnode->port;
|
pReplica->port = pVgidDnode->port;
|
||||||
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
mndReleaseDnode(pMnode, pVgidDnode);
|
mndReleaseDnode(pMnode, pVgidDnode);
|
||||||
|
|
||||||
|
if (pDnode->id == pVgid->dnodeId) {
|
||||||
|
alterReq.selfIndex = v;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
|
if (alterReq.selfIndex == -1) {
|
||||||
|
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSAlterVnodeReplicaReq(NULL, 0, &alterReq);
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
contLen += sizeof(SMsgHead);
|
|
||||||
|
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
|
@ -323,38 +357,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgHead *pHead = pReq;
|
tSerializeSAlterVnodeReplicaReq(pReq, contLen, &alterReq);
|
||||||
pHead->contLen = htonl(contLen);
|
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
|
||||||
|
|
||||||
tSerializeSAlterVnodeReq((char *)pReq + sizeof(SMsgHead), contLen, &alterReq);
|
|
||||||
*pContLen = contLen;
|
|
||||||
return pReq;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *mndBuildSetVnodeStandbyReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
|
||||||
SSetStandbyReq standbyReq = {0};
|
|
||||||
standbyReq.dnodeId = pDnode->id;
|
|
||||||
standbyReq.standby = 1;
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq);
|
|
||||||
if (contLen < 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
contLen += sizeof(SMsgHead);
|
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
|
||||||
if (pReq == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSerializeSSetStandbyReq((char *)pReq + sizeof(SMsgHead), contLen, &standbyReq);
|
|
||||||
SMsgHead *pHead = pReq;
|
|
||||||
pHead->contLen = htonl(contLen);
|
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
|
||||||
|
|
||||||
*pContLen = contLen;
|
*pContLen = contLen;
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
@ -948,8 +951,7 @@ _OVER:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
|
||||||
bool standby) {
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||||
|
@ -958,7 +960,7 @@ int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby);
|
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
||||||
if (pReq == NULL) return -1;
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
@ -1007,7 +1009,7 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen);
|
void *pReq = mndBuildAlterVnodeConfigReq(pMnode, pDb, pVgroup, &contLen);
|
||||||
if (pReq == NULL) return -1;
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
|
@ -1022,41 +1024,6 @@ int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
|
||||||
SVnodeGid *pVgid, bool isRedo) {
|
|
||||||
STransAction action = {0};
|
|
||||||
|
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
|
||||||
if (pDnode == NULL) return -1;
|
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
|
|
||||||
int32_t contLen = 0;
|
|
||||||
void *pReq = mndBuildSetVnodeStandbyReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
|
||||||
if (pReq == NULL) return -1;
|
|
||||||
|
|
||||||
action.pCont = pReq;
|
|
||||||
action.contLen = contLen;
|
|
||||||
action.msgType = TDMT_SYNC_SET_VNODE_STANDBY;
|
|
||||||
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
|
|
||||||
// Keep retrying until the target vnode is not the leader
|
|
||||||
action.retryCode = TSDB_CODE_SYN_IS_LEADER;
|
|
||||||
|
|
||||||
if (isRedo) {
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
||||||
bool isRedo) {
|
bool isRedo) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
@ -1102,7 +1069,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
||||||
|
|
||||||
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
|
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1], true) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[newVg.replica - 1]) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||||
|
|
||||||
|
@ -1111,7 +1078,6 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
||||||
SVnodeGid del = newVg.vnodeGid[vnIndex];
|
SVnodeGid del = newVg.vnodeGid[vnIndex];
|
||||||
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
|
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
|
||||||
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
|
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
|
||||||
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del, true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||||
|
@ -1185,7 +1151,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb
|
||||||
pGid->dnodeId = newDnodeId;
|
pGid->dnodeId = newDnodeId;
|
||||||
pGid->syncState = TAOS_SYNC_STATE_ERROR;
|
pGid->syncState = TAOS_SYNC_STATE_ERROR;
|
||||||
|
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid, true) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
|
||||||
|
@ -1212,7 +1178,6 @@ static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, S
|
||||||
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
|
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
|
||||||
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
||||||
|
|
||||||
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
@ -1580,36 +1545,32 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
||||||
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
if (newVgroup.replica < pNewDb->cfg.replications) {
|
if (newVgroup.replica == 1 && pNewDb->cfg.replications == 3) {
|
||||||
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
|
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
|
||||||
pVgroup->vnodeGid[0].dnodeId);
|
pVgroup->vnodeGid[0].dnodeId);
|
||||||
|
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
|
||||||
|
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
|
||||||
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||||
} else if (newVgroup.replica > pNewDb->cfg.replications) {
|
}
|
||||||
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
|
||||||
|
if (newVgroup.replica == 3 && pNewDb->cfg.replications == 1) {
|
||||||
|
mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
|
||||||
|
pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
|
||||||
|
|
||||||
SVnodeGid del1 = {0};
|
SVnodeGid del1 = {0};
|
||||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
|
|
||||||
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del1, true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
|
||||||
|
|
||||||
SVnodeGid del2 = {0};
|
SVnodeGid del2 = {0};
|
||||||
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
|
||||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
||||||
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del2, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -1660,13 +1621,12 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
||||||
|
|
||||||
if (newVg1.replica == 1) {
|
if (newVg1.replica == 1) {
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVg1, pArray) != 0) goto _OVER;
|
if (mndAddVnodeToVgroup(pMnode, &newVg1, pArray) != 0) goto _OVER;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1], true) != 0) goto _OVER;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1]) != 0) goto _OVER;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||||
} else if (newVg1.replica == 3) {
|
} else if (newVg1.replica == 3) {
|
||||||
SVnodeGid del1 = {0};
|
SVnodeGid del1 = {0};
|
||||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER;
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER;
|
||||||
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) goto _OVER;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||||
|
|
|
@ -50,6 +50,7 @@ extern const SVnodeCfg vnodeCfgDefault;
|
||||||
int32_t vnodeInit(int32_t nthreads);
|
int32_t vnodeInit(int32_t nthreads);
|
||||||
void vnodeCleanup();
|
void vnodeCleanup();
|
||||||
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
|
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodePreClose(SVnode *pVnode);
|
void vnodePreClose(SVnode *pVnode);
|
||||||
|
|
|
@ -15,11 +15,9 @@
|
||||||
|
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN];
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
// TODO: check if directory exists
|
|
||||||
|
|
||||||
// check config
|
// check config
|
||||||
if (vnodeCheckCfg(pCfg) < 0) {
|
if (vnodeCheckCfg(pCfg) < 0) {
|
||||||
|
@ -56,18 +54,60 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode is created", info.config.vgId);
|
vInfo("vgId:%d, vnode is created", info.config.vgId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); }
|
int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
|
||||||
|
SVnodeInfo info = {0};
|
||||||
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
if (pTfs) {
|
||||||
|
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
||||||
|
} else {
|
||||||
|
snprintf(dir, TSDB_FILENAME_LEN, "%s", path);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = vnodeLoadInfo(dir, &info);
|
||||||
|
if (ret < 0) {
|
||||||
|
vError("vgId:%d, failed to read vnode config from %s since %s", pReq->vgId, path, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncCfg *pCfg = &info.config.syncCfg;
|
||||||
|
pCfg->myIndex = pReq->selfIndex;
|
||||||
|
pCfg->replicaNum = pReq->replica;
|
||||||
|
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
|
||||||
|
|
||||||
|
vInfo("vgId:%d, save config, replicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum, pCfg->myIndex);
|
||||||
|
for (int i = 0; i < pReq->replica; ++i) {
|
||||||
|
SNodeInfo *pNode = &pCfg->nodeInfo[i];
|
||||||
|
pNode->nodePort = pReq->replicas[i].port;
|
||||||
|
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||||
|
vInfo("vgId:%d, save config, replica:%d ep:%s:%u", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort);
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = vnodeSaveInfo(dir, &info);
|
||||||
|
if (ret < 0) {
|
||||||
|
vError("vgId:%d, failed to save vnode config since %s", pReq->vgId, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
vInfo("vgId:%d, vnode config is saved", info.config.vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeDestroy(const char *path, STfs *pTfs) {
|
||||||
|
vInfo("path:%s is removed while destroy vnode", path);
|
||||||
|
tfsRmdir(pTfs, path);
|
||||||
|
}
|
||||||
|
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
SVnode *pVnode = NULL;
|
SVnode *pVnode = NULL;
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN];
|
char dir[TSDB_FILENAME_LEN] = {0};
|
||||||
char tdir[TSDB_FILENAME_LEN * 2];
|
char tdir[TSDB_FILENAME_LEN * 2] = {0};
|
||||||
int ret;
|
int32_t ret = 0;
|
||||||
|
|
||||||
if (pTfs) {
|
if (pTfs) {
|
||||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
||||||
|
@ -85,7 +125,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// create handle
|
// create handle
|
||||||
pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
|
pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
|
vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
|
||||||
|
|
|
@ -1068,20 +1068,20 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SAlterVnodeReq req = {0};
|
bool walChanged = false;
|
||||||
bool walChanged = false;
|
bool tsdbChanged = false;
|
||||||
bool tsdbChanged = false;
|
|
||||||
|
|
||||||
if (tDeserializeSAlterVnodeReq(pReq, len, &req) != 0) {
|
SAlterVnodeConfigReq req = {0};
|
||||||
|
if (tDeserializeSAlterVnodeConfigReq(pReq, len, &req) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
|
vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
|
||||||
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d strict:%d",
|
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d",
|
||||||
TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
|
TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
|
||||||
req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
|
req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
|
||||||
req.walFsyncPeriod, req.walLevel, req.strict);
|
req.walFsyncPeriod, req.walLevel);
|
||||||
|
|
||||||
if (pVnode->config.cacheLastSize != req.cacheLastSize) {
|
if (pVnode->config.cacheLastSize != req.cacheLastSize) {
|
||||||
pVnode->config.cacheLastSize = req.cacheLastSize;
|
pVnode->config.cacheLastSize = req.cacheLastSize;
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
||||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_REPLICA);
|
(type == TDMT_VND_UPDATE_TAG_VAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
|
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
|
||||||
|
@ -53,76 +53,6 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSetStandBy(SVnode *pVnode) {
|
|
||||||
vInfo("vgId:%d, start to set standby", TD_VID(pVnode));
|
|
||||||
|
|
||||||
if (syncSetStandby(pVnode->sync) == 0) {
|
|
||||||
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
|
|
||||||
return 0;
|
|
||||||
} else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
|
|
||||||
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode));
|
|
||||||
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
|
||||||
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (syncSetStandby(pVnode->sync) == 0) {
|
|
||||||
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
vError("vgId:%d, failed to set standby after leader transfer since %s", TD_VID(pVnode), terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|
||||||
SAlterVnodeReq req = {0};
|
|
||||||
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
return TSDB_CODE_INVALID_MSG;
|
|
||||||
}
|
|
||||||
|
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
|
||||||
vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
|
|
||||||
|
|
||||||
SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
|
|
||||||
for (int32_t r = 0; r < req.replica; ++r) {
|
|
||||||
SNodeInfo *pNode = &cfg.nodeInfo[r];
|
|
||||||
tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
|
|
||||||
pNode->nodePort = req.replicas[r].port;
|
|
||||||
vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
|
|
||||||
}
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.info = pMsg->info};
|
|
||||||
if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
|
|
||||||
vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
|
|
||||||
if (code != 0) {
|
|
||||||
if (terrno != 0) code = terrno;
|
|
||||||
|
|
||||||
vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
|
|
||||||
if (terrno == TSDB_CODE_SYN_IS_LEADER) {
|
|
||||||
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
|
||||||
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
|
|
||||||
} else {
|
|
||||||
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
SEpSet newEpSet = {0};
|
SEpSet newEpSet = {0};
|
||||||
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
||||||
|
@ -169,24 +99,6 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|
||||||
int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
|
|
||||||
|
|
||||||
if (code > 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
} else if (code == 0) {
|
|
||||||
vnodeWaitBlockMsg(pVnode, pMsg);
|
|
||||||
} else {
|
|
||||||
if (terrno != 0) code = terrno;
|
|
||||||
vnodeHandleProposeError(pVnode, pMsg, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
|
||||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
taosFreeQitem(pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
|
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
|
||||||
if (*arrSize <= 0) return;
|
if (*arrSize <= 0) return;
|
||||||
|
|
||||||
|
@ -265,11 +177,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
|
|
||||||
vnodeHandleAlterReplicaReq(pVnode, pMsg);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isBlock || BATCH_DISABLE) {
|
if (isBlock || BATCH_DISABLE) {
|
||||||
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
|
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
|
||||||
}
|
}
|
||||||
|
@ -431,11 +338,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
ASSERT(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
|
||||||
code = vnodeSetStandBy(pVnode);
|
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
|
||||||
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
} else {
|
} else {
|
||||||
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -496,11 +398,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
|
||||||
syncSnapshotRspDestroy(pSyncMsg);
|
syncSnapshotRspDestroy(pSyncMsg);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
|
||||||
code = vnodeSetStandBy(pVnode);
|
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
|
||||||
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
} else {
|
} else {
|
||||||
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -543,22 +440,7 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {
|
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {}
|
||||||
SVnode *pVnode = pFsm->data;
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
|
||||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta->newCfgSeqNum, &rpcMsg.info);
|
|
||||||
rpcMsg.info.conn.applyIndex = cbMeta->index;
|
|
||||||
|
|
||||||
const STraceId *trace = (STraceId *)&pMsg->info.traceId;
|
|
||||||
vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
|
|
||||||
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta->seqNum, rpcMsg.info.handle);
|
|
||||||
if (rpcMsg.info.handle != NULL) {
|
|
||||||
tmsgSendRsp(&rpcMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodePostBlockMsg(pVnode, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
if (cbMeta.isWeak == 0) {
|
if (cbMeta.isWeak == 0) {
|
||||||
|
|
|
@ -224,8 +224,8 @@ alter_db_option(A) ::= WAL_FSYNC_PERIOD NK_INTEGER(B).
|
||||||
alter_db_option(A) ::= KEEP integer_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
|
alter_db_option(A) ::= KEEP integer_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
|
||||||
alter_db_option(A) ::= KEEP variable_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
|
alter_db_option(A) ::= KEEP variable_list(B). { A.type = DB_OPTION_KEEP; A.pList = B; }
|
||||||
alter_db_option(A) ::= PAGES NK_INTEGER(B). { A.type = DB_OPTION_PAGES; A.val = B; }
|
alter_db_option(A) ::= PAGES NK_INTEGER(B). { A.type = DB_OPTION_PAGES; A.val = B; }
|
||||||
//alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.val = B; }
|
alter_db_option(A) ::= REPLICA NK_INTEGER(B). { A.type = DB_OPTION_REPLICA; A.val = B; }
|
||||||
//alter_db_option(A) ::= STRICT NK_STRING(B). { A.type = DB_OPTION_STRICT; A.val = B; }
|
alter_db_option(A) ::= STRICT NK_STRING(B). { A.type = DB_OPTION_STRICT; A.val = B; }
|
||||||
alter_db_option(A) ::= WAL_LEVEL NK_INTEGER(B). { A.type = DB_OPTION_WAL; A.val = B; }
|
alter_db_option(A) ::= WAL_LEVEL NK_INTEGER(B). { A.type = DB_OPTION_WAL; A.val = B; }
|
||||||
alter_db_option(A) ::= STT_TRIGGER NK_INTEGER(B). { A.type = DB_OPTION_STT_TRIGGER; A.val = B; }
|
alter_db_option(A) ::= STT_TRIGGER NK_INTEGER(B). { A.type = DB_OPTION_STT_TRIGGER; A.val = B; }
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue