sma
This commit is contained in:
parent
652600b40a
commit
765cdd1712
|
@ -185,10 +185,10 @@ typedef struct SField {
|
||||||
} SField;
|
} SField;
|
||||||
|
|
||||||
typedef struct SRetention {
|
typedef struct SRetention {
|
||||||
int32_t first;
|
int32_t freq;
|
||||||
int32_t second;
|
int32_t keep;
|
||||||
int8_t firstUnit;
|
int8_t freqUnit;
|
||||||
int8_t secondUnit;
|
int8_t keepUnit;
|
||||||
} SRetention;
|
} SRetention;
|
||||||
|
|
||||||
#pragma pack(push, 1)
|
#pragma pack(push, 1)
|
||||||
|
@ -764,11 +764,13 @@ typedef struct {
|
||||||
int8_t selfIndex;
|
int8_t selfIndex;
|
||||||
int8_t streamMode;
|
int8_t streamMode;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
|
int32_t numOfRetensions;
|
||||||
|
SArray* pRetensions; // SRetention
|
||||||
} SCreateVnodeReq, SAlterVnodeReq;
|
} SCreateVnodeReq, SAlterVnodeReq;
|
||||||
|
|
||||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||||
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||||
|
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
|
|
@ -1545,10 +1545,10 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
||||||
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
||||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||||
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
||||||
if (tEncodeI32(&encoder, pRetension->first) < 0) return -1;
|
if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pRetension->second) < 0) return -1;
|
if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pRetension->firstUnit) < 0) return -1;
|
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pRetension->secondUnit) < 0) return -1;
|
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -1592,10 +1592,10 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||||
SRetention rentension = {0};
|
SRetention rentension = {0};
|
||||||
if (tDecodeI32(&decoder, &rentension.first) < 0) return -1;
|
if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &rentension.second) < 0) return -1;
|
if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &rentension.firstUnit) < 0) return -1;
|
if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &rentension.secondUnit) < 0) return -1;
|
if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1;
|
||||||
if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) {
|
if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -2490,6 +2490,14 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
|
||||||
SReplica *pReplica = &pReq->replicas[i];
|
SReplica *pReplica = &pReq->replicas[i];
|
||||||
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||||
|
SRetention *pRetension = taosArrayGet(pReq->pRetensions, i);
|
||||||
|
if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
|
||||||
|
}
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -2534,11 +2542,35 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
|
||||||
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1;
|
||||||
|
pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention));
|
||||||
|
if (pReq->pRetensions == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pReq->numOfRetensions; ++i) {
|
||||||
|
SRetention rentension = {0};
|
||||||
|
if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1;
|
||||||
|
if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) {
|
||||||
|
taosArrayDestroy(pReq->pRetensions);
|
||||||
|
pReq->pRetensions = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) {
|
int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
|
@ -267,6 +267,8 @@ typedef struct {
|
||||||
int8_t update;
|
int8_t update;
|
||||||
int8_t cacheLastRow;
|
int8_t cacheLastRow;
|
||||||
int8_t streamMode;
|
int8_t streamMode;
|
||||||
|
int32_t numOfRetensions;
|
||||||
|
SArray* pRetensions;
|
||||||
} SDbCfg;
|
} SDbCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -100,6 +100,15 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
|
||||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum, DB_ENCODE_OVER)
|
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum, DB_ENCODE_OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update, DB_ENCODE_OVER)
|
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update, DB_ENCODE_OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER)
|
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER)
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, DB_ENCODE_OVER)
|
||||||
|
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
|
||||||
|
SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i);
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pRetension->freq, DB_ENCODE_OVER)
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pRetension->keep, DB_ENCODE_OVER)
|
||||||
|
SDB_SET_INT8(pRaw, dataPos, pRetension->freqUnit, DB_ENCODE_OVER)
|
||||||
|
SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, DB_ENCODE_OVER)
|
||||||
|
}
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_ENCODE_OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_ENCODE_OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, DB_ENCODE_OVER)
|
SDB_SET_DATALEN(pRaw, dataPos, DB_ENCODE_OVER)
|
||||||
|
|
||||||
|
@ -161,6 +170,22 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.quorum, DB_DECODE_OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.quorum, DB_DECODE_OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.update, DB_DECODE_OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.update, DB_DECODE_OVER)
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, DB_DECODE_OVER)
|
SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, DB_DECODE_OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfRetensions, DB_DECODE_OVER)
|
||||||
|
if (pDb->cfg.numOfRetensions > 0) {
|
||||||
|
pDb->cfg.pRetensions = taosArrayInit(pDb->cfg.numOfRetensions, sizeof(SRetention));
|
||||||
|
if (pDb->cfg.pRetensions == NULL) goto DB_DECODE_OVER;
|
||||||
|
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
|
||||||
|
SRetention retension = {0};
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &retension.freq, DB_DECODE_OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &retension.keep, DB_DECODE_OVER)
|
||||||
|
SDB_GET_INT8(pRaw, dataPos, &retension.freqUnit, DB_DECODE_OVER)
|
||||||
|
SDB_GET_INT8(pRaw, dataPos, &retension.keepUnit, DB_DECODE_OVER)
|
||||||
|
if (taosArrayPush(pDb->cfg.pRetensions, &retension) == NULL) {
|
||||||
|
goto DB_DECODE_OVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_DECODE_OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_DECODE_OVER)
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
@ -183,6 +208,7 @@ static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
|
||||||
|
|
||||||
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
|
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
|
||||||
mTrace("db:%s, perform delete action, row:%p", pDb->name, pDb);
|
mTrace("db:%s, perform delete action, row:%p", pDb->name, pDb);
|
||||||
|
taosArrayDestroy(pDb->cfg.pRetensions);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -417,6 +443,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
|
||||||
.streamMode = pCreate->streamMode,
|
.streamMode = pCreate->streamMode,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
dbObj.cfg.numOfRetensions = pCreate->numOfRetensions;
|
||||||
|
dbObj.cfg.pRetensions = pCreate->pRetensions;
|
||||||
|
pCreate = NULL;
|
||||||
|
|
||||||
mndSetDefaultDbCfg(&dbObj.cfg);
|
mndSetDefaultDbCfg(&dbObj.cfg);
|
||||||
|
|
||||||
if (mndCheckDbName(dbObj.name, pUser) != 0) {
|
if (mndCheckDbName(dbObj.name, pUser) != 0) {
|
||||||
|
@ -505,6 +535,7 @@ CREATE_DB_OVER:
|
||||||
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
tFreeSCreateDbReq(&createReq);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
||||||
createReq.hashBegin = pVgroup->hashBegin;
|
createReq.hashBegin = pVgroup->hashBegin;
|
||||||
createReq.hashEnd = pVgroup->hashEnd;
|
createReq.hashEnd = pVgroup->hashEnd;
|
||||||
createReq.hashMethod = pDb->hashMethod;
|
createReq.hashMethod = pDb->hashMethod;
|
||||||
|
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
|
||||||
|
createReq.pRetensions = pDb->cfg.pRetensions;
|
||||||
|
|
||||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||||
SReplica *pReplica = &createReq.replicas[v];
|
SReplica *pReplica = &createReq.replicas[v];
|
||||||
|
|
Loading…
Reference in New Issue