From 765cdd17122c70396ba0b5c624d24b1f5a0f5342 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 24 Mar 2022 14:54:10 +0800 Subject: [PATCH] sma --- include/common/tmsg.h | 12 ++++--- source/common/src/tmsg.c | 48 ++++++++++++++++++++----- source/dnode/mnode/impl/inc/mndDef.h | 2 ++ source/dnode/mnode/impl/src/mndDb.c | 31 ++++++++++++++++ source/dnode/mnode/impl/src/mndVgroup.c | 2 ++ 5 files changed, 82 insertions(+), 13 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3c814c7161..f20b369c6b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -185,10 +185,10 @@ typedef struct SField { } SField; typedef struct SRetention { - int32_t first; - int32_t second; - int8_t firstUnit; - int8_t secondUnit; + int32_t freq; + int32_t keep; + int8_t freqUnit; + int8_t keepUnit; } SRetention; #pragma pack(push, 1) @@ -764,11 +764,13 @@ typedef struct { int8_t selfIndex; int8_t streamMode; SReplica replicas[TSDB_MAX_REPLICA]; - + int32_t numOfRetensions; + SArray* pRetensions; // SRetention } SCreateVnodeReq, SAlterVnodeReq; int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); +int32_t tFreeSCreateVnodeReq(SCreateVnodeReq* pReq); typedef struct { int32_t vgId; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8aa2c2b125..30a58bdb75 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1545,10 +1545,10 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) { if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention *pRetension = taosArrayGet(pReq->pRetensions, i); - if (tEncodeI32(&encoder, pRetension->first) < 0) return -1; - if (tEncodeI32(&encoder, pRetension->second) < 0) return -1; - if (tEncodeI8(&encoder, pRetension->firstUnit) < 0) return -1; - if (tEncodeI8(&encoder, pRetension->secondUnit) < 0) return -1; + if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1; + if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1; + if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1; + if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1; } tEndEncode(&encoder); @@ -1592,10 +1592,10 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { SRetention rentension = {0}; - if (tDecodeI32(&decoder, &rentension.first) < 0) return -1; - if (tDecodeI32(&decoder, &rentension.second) < 0) return -1; - if (tDecodeI8(&decoder, &rentension.firstUnit) < 0) return -1; - if (tDecodeI8(&decoder, &rentension.secondUnit) < 0) return -1; + if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1; + if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1; + if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1; + if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1; if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -2490,6 +2490,14 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR SReplica *pReplica = &pReq->replicas[i]; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1; } + if (tEncodeI32(&encoder, pReq->numOfRetensions) < 0) return -1; + for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { + SRetention *pRetension = taosArrayGet(pReq->pRetensions, i); + if (tEncodeI32(&encoder, pRetension->freq) < 0) return -1; + if (tEncodeI32(&encoder, pRetension->keep) < 0) return -1; + if (tEncodeI8(&encoder, pRetension->freqUnit) < 0) return -1; + if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1; + } tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2534,11 +2542,35 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; } + if (tDecodeI32(&decoder, &pReq->numOfRetensions) < 0) return -1; + pReq->pRetensions = taosArrayInit(pReq->numOfRetensions, sizeof(SRetention)); + if (pReq->pRetensions == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < pReq->numOfRetensions; ++i) { + SRetention rentension = {0}; + if (tDecodeI32(&decoder, &rentension.freq) < 0) return -1; + if (tDecodeI32(&decoder, &rentension.keep) < 0) return -1; + if (tDecodeI8(&decoder, &rentension.freqUnit) < 0) return -1; + if (tDecodeI8(&decoder, &rentension.keepUnit) < 0) return -1; + if (taosArrayPush(pReq->pRetensions, &rentension) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + tEndDecode(&decoder); tCoderClear(&decoder); return 0; } +int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) { + taosArrayDestroy(pReq->pRetensions); + pReq->pRetensions = NULL; +} + int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index abb61ee472..a3936c0acc 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -267,6 +267,8 @@ typedef struct { int8_t update; int8_t cacheLastRow; int8_t streamMode; + int32_t numOfRetensions; + SArray* pRetensions; } SDbCfg; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 7469d720b5..183e142624 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -100,6 +100,15 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { SDB_SET_INT8(pRaw, dataPos, pDb->cfg.quorum, DB_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.update, DB_ENCODE_OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.cacheLastRow, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, DB_ENCODE_OVER) + for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) { + SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i); + SDB_SET_INT32(pRaw, dataPos, pRetension->freq, DB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pRetension->keep, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pRetension->freqUnit, DB_ENCODE_OVER) + SDB_SET_INT8(pRaw, dataPos, pRetension->keepUnit, DB_ENCODE_OVER) + } + SDB_SET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, DB_ENCODE_OVER) @@ -161,6 +170,22 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.quorum, DB_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.update, DB_DECODE_OVER) SDB_GET_INT8(pRaw, dataPos, &pDb->cfg.cacheLastRow, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pDb->cfg.numOfRetensions, DB_DECODE_OVER) + if (pDb->cfg.numOfRetensions > 0) { + pDb->cfg.pRetensions = taosArrayInit(pDb->cfg.numOfRetensions, sizeof(SRetention)); + if (pDb->cfg.pRetensions == NULL) goto DB_DECODE_OVER; + for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) { + SRetention retension = {0}; + SDB_GET_INT32(pRaw, dataPos, &retension.freq, DB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &retension.keep, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &retension.freqUnit, DB_DECODE_OVER) + SDB_GET_INT8(pRaw, dataPos, &retension.keepUnit, DB_DECODE_OVER) + if (taosArrayPush(pDb->cfg.pRetensions, &retension) == NULL) { + goto DB_DECODE_OVER; + } + } + } + SDB_GET_RESERVE(pRaw, dataPos, TSDB_DB_RESERVE_SIZE, DB_DECODE_OVER) terrno = 0; @@ -183,6 +208,7 @@ static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { mTrace("db:%s, perform delete action, row:%p", pDb->name, pDb); + taosArrayDestroy(pDb->cfg.pRetensions); return 0; } @@ -417,6 +443,10 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate .streamMode = pCreate->streamMode, }; + dbObj.cfg.numOfRetensions = pCreate->numOfRetensions; + dbObj.cfg.pRetensions = pCreate->pRetensions; + pCreate = NULL; + mndSetDefaultDbCfg(&dbObj.cfg); if (mndCheckDbName(dbObj.name, pUser) != 0) { @@ -505,6 +535,7 @@ CREATE_DB_OVER: mndReleaseDb(pMnode, pDb); mndReleaseUser(pMnode, pUser); + tFreeSCreateDbReq(&createReq); return code; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 1629f71c04..0b31f7d06c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -218,6 +218,8 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg createReq.hashBegin = pVgroup->hashBegin; createReq.hashEnd = pVgroup->hashEnd; createReq.hashMethod = pDb->hashMethod; + createReq.numOfRetensions = pDb->cfg.numOfRetensions; + createReq.pRetensions = pDb->cfg.pRetensions; for (int32_t v = 0; v < pVgroup->replica; ++v) { SReplica *pReplica = &createReq.replicas[v];