From 63d3671f42700f636c2b2c17880c5c86c5870d8d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 6 Mar 2024 12:06:26 +0000 Subject: [PATCH] add new msg --- include/common/tmsg.h | 92 +++++++++++++++----- include/libs/executor/storageapi.h | 7 +- include/util/taoserror.h | 1 + source/common/src/tmsg.c | 111 ++++++++++++++++-------- source/dnode/mnode/impl/src/mndStb.c | 65 +++++++++++++- source/dnode/vnode/inc/vnode.h | 18 ++-- source/dnode/vnode/src/inc/meta.h | 8 +- source/dnode/vnode/src/meta/metaEntry.c | 45 +++++++++- source/dnode/vnode/src/meta/metaTable.c | 27 +++++- source/util/src/terror.c | 2 + 10 files changed, 294 insertions(+), 82 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index af848fce60..53e555fb91 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -153,18 +153,19 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_MAX, } EShowType; -#define TSDB_ALTER_TABLE_ADD_TAG 1 -#define TSDB_ALTER_TABLE_DROP_TAG 2 -#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3 -#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 -#define TSDB_ALTER_TABLE_ADD_COLUMN 5 -#define TSDB_ALTER_TABLE_DROP_COLUMN 6 -#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7 -#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 -#define TSDB_ALTER_TABLE_UPDATE_OPTIONS 9 -#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10 -#define TSDB_ALTER_TABLE_ADD_TAG_INDEX 11 -#define TSDB_ALTER_TABLE_DROP_TAG_INDEX 12 +#define TSDB_ALTER_TABLE_ADD_TAG 1 +#define TSDB_ALTER_TABLE_DROP_TAG 2 +#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3 +#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4 +#define TSDB_ALTER_TABLE_ADD_COLUMN 5 +#define TSDB_ALTER_TABLE_DROP_COLUMN 6 +#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7 +#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8 +#define TSDB_ALTER_TABLE_UPDATE_OPTIONS 9 +#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10 +#define TSDB_ALTER_TABLE_ADD_TAG_INDEX 11 +#define TSDB_ALTER_TABLE_DROP_TAG_INDEX 12 +#define TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS 13 #define TSDB_FILL_NONE 0 #define TSDB_FILL_NULL 1 @@ -551,7 +552,7 @@ struct SSchema { #define COMPRESS_L2_TYPE_U8(type) (((type) >> 3) & 0x07) #define COMPRESS_L2_TYPE_LEVEL_U8(type) (((type) >> 6) & 0x03) -// +// struct SSchema2 { int8_t type; @@ -640,6 +641,46 @@ typedef struct { SSchema* pSchema; } SSchemaWrapper; +typedef struct { + col_id_t id; + uint32_t alg; +} SColCmpr; + +typedef struct { + int32_t nCols; + int32_t version; + SColCmpr* pColCmpr; +} SColCmprWrapper; + +static FORCE_INLINE SColCmprWrapper* tCloneSColCmprWrapper(const SColCmprWrapper* pSrcWrapper) { + if (pSrcWrapper->pColCmpr == NULL || pSrcWrapper->nCols == 0) return NULL; + + SColCmprWrapper* pDstWrapper = taosMemoryMalloc(pSrcWrapper->nCols * sizeof(SColCmpr)); + pDstWrapper->nCols = pSrcWrapper->nCols; + pDstWrapper->version = pSrcWrapper->version; + + int32_t size = sizeof(SColCmpr) * pDstWrapper->nCols; + pDstWrapper->pColCmpr = taosMemoryCalloc(1, size); + memcpy(pDstWrapper->pColCmpr, pSrcWrapper->pColCmpr, size); + + return pDstWrapper; +} + +static FORCE_INLINE void tInitDefaultSColCmprWrapper(SColCmprWrapper* pCmpr, SSchemaWrapper* pSchema) { + pCmpr->nCols = pSchema->nCols; + for (int32_t i = 0; i < pCmpr->nCols; i++) { + SColCmpr* pColCmpr = &pCmpr->pColCmpr[i]; + SSchema* pColSchema = &pSchema->pSchema[i]; + pColCmpr->id = pColSchema->colId; + pColCmpr->alg = 0; + } +} +static FORCE_INLINE void tDeleteSColCmprWrapper(SColCmprWrapper* pWrapper) { + if (pWrapper == NULL) return; + + taosMemoryFreeClear(pWrapper->pColCmpr); + taosMemoryFreeClear(pWrapper); +} static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) { if (pSchemaWrapper->pSchema == NULL) return NULL; @@ -2676,15 +2717,17 @@ int32_t tDecodeSRSmaParam(SDecoder* pCoder, SRSmaParam* pRSmaParam); // TDMT_VND_CREATE_STB ============== typedef struct SVCreateStbReq { - char* name; - tb_uid_t suid; - int8_t rollup; - SSchemaWrapper schemaRow; - SSchemaWrapper schemaTag; - SRSmaParam rsmaParam; - int32_t alterOriDataLen; - void* alterOriData; - int8_t source; + char* name; + tb_uid_t suid; + int8_t rollup; + SSchemaWrapper schemaRow; + SSchemaWrapper schemaTag; + SRSmaParam rsmaParam; + int32_t alterOriDataLen; + void* alterOriData; + int8_t source; + int8_t colCmpred; + SColCmprWrapper colCmpr; } SVCreateStbReq; int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq); @@ -2722,8 +2765,9 @@ typedef struct SVCreateTbReq { SSchemaWrapper schemaRow; } ntb; }; - int32_t sqlLen; - char* sql; + int32_t sqlLen; + char* sql; + SColCmprWrapper colCmpr; } SVCreateTbReq; int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index e66ffe5521..c5b135d967 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -77,7 +77,9 @@ typedef struct SMetaEntry { } smaEntry; }; - uint8_t* pBuf; + uint8_t* pBuf; + + SColCmprWrapper colCmpr; // col compress alg } SMetaEntry; typedef struct SMetaReader { @@ -362,7 +364,8 @@ typedef struct SStateStore { int32_t (*streamStateSessionAllocWinBuffByNextPosition)(SStreamState* pState, SStreamStateCur* pCur, const SSessionKey* pKey, void** pVal, int32_t* pVLen); - int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen); + int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, + void** ppVal, int32_t* pVLen); int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen); SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 556479f547..fe87f4daf9 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -273,6 +273,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E) #define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F) #define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370) +#define TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0371) // mnode-func diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4640c20e07..20b1220dbc 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1470,9 +1470,7 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) { return 0; } -void tFreeSStatisReq(SStatisReq *pReq) { - taosMemoryFreeClear(pReq->pCont); -} +void tFreeSStatisReq(SStatisReq *pReq) { taosMemoryFreeClear(pReq->pCont); } int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) { SEncoder encoder = {0}; @@ -1871,7 +1869,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) char *tb = taosHashIterate(pRsp->readTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1886,7 +1884,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->writeTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1901,7 +1899,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->alterTbs, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1916,7 +1914,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->readViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1931,7 +1929,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->writeViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1946,7 +1944,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) tb = taosHashIterate(pRsp->alterViews, NULL); while (tb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(tb, &keyLen); + void *key = taosHashGetKey(tb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -1961,7 +1959,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) int32_t *useDb = taosHashIterate(pRsp->useDbs, NULL); while (useDb != NULL) { size_t keyLen = 0; - void * key = taosHashGetKey(useDb, &keyLen); + void *key = taosHashGetKey(useDb, &keyLen); if (tEncodeI32(pEncoder, keyLen) < 0) return -1; if (tEncodeCStr(pEncoder, key) < 0) return -1; @@ -7007,27 +7005,27 @@ void tFreeSSchedulerHbRsp(SSchedulerHbRsp *pRsp) { taosArrayDestroy(pRsp->taskSt // return 0; // } -//int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { - // SDecoder decoder = {0}; - // int32_t num = 0; - // tDecoderInit(&decoder, buf, bufLen); +// int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { +// SDecoder decoder = {0}; +// int32_t num = 0; +// tDecoderInit(&decoder, buf, bufLen); - // if (tStartDecode(&decoder) < 0) return -1; - // if (tDecodeI32(&decoder, &num) < 0) return -1; - // if (num > 0) { - // pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp)); - // if (NULL == pRsp->rspList) return -1; - // for (int32_t i = 0; i < num; ++i) { - // SVCreateTbRsp rsp = {0}; - // if (tDecodeI32(&decoder, &rsp.code) < 0) return -1; - // if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1; - // } - // } else { - // pRsp->rspList = NULL; - // } - // tEndDecode(&decoder); +// if (tStartDecode(&decoder) < 0) return -1; +// if (tDecodeI32(&decoder, &num) < 0) return -1; +// if (num > 0) { +// pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp)); +// if (NULL == pRsp->rspList) return -1; +// for (int32_t i = 0; i < num; ++i) { +// SVCreateTbRsp rsp = {0}; +// if (tDecodeI32(&decoder, &rsp.code) < 0) return -1; +// if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1; +// } +// } else { +// pRsp->rspList = NULL; +// } +// tEndDecode(&decoder); - // tDecoderClear(&decoder); +// tDecoderClear(&decoder); // return 0; //} @@ -7300,8 +7298,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeI32(&encoder, taosArrayGetSize(pReq->pVgroupVerList)) < 0) return -1; - for(int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) { - SVgroupVer* p = taosArrayGet(pReq->pVgroupVerList, i); + for (int32_t i = 0; i < taosArrayGetSize(pReq->pVgroupVerList); ++i) { + SVgroupVer *p = taosArrayGet(pReq->pVgroupVerList, i); if (tEncodeI32(&encoder, p->vgId) < 0) return -1; if (tEncodeI64(&encoder, p->ver) < 0) return -1; } @@ -7523,6 +7521,33 @@ int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) { return 0; } +int32_t tEncodeSColCmprWrapper(SEncoder *pCoder, const SColCmprWrapper *pWrapper) { + if (tEncodeI32v(pCoder, pWrapper->nCols) < 0) return -1; + if (tEncodeI32v(pCoder, pWrapper->version) < 0) return -1; + for (int32_t i = 0; i < pWrapper->nCols; i++) { + SColCmpr *p = &pWrapper->pColCmpr[i]; + if (tEncodeI16v(pCoder, p->id) < 0) return -1; + if (tEncodeU32(pCoder, p->alg) < 0) return -1; + } + return 0; +} +int32_t tDecodeSColCmprWrapperEx(SDecoder *pDecoder, SColCmprWrapper *pWrapper) { + if (tDecodeI32v(pDecoder, &pWrapper->nCols) < 0) return -1; + if (tDecodeI32v(pDecoder, &pWrapper->version) < 0) return -1; + + pWrapper->pColCmpr = (SColCmpr *)taosMemoryCalloc(1, pWrapper->nCols * sizeof(SColCmpr)); + if (pWrapper->pColCmpr == NULL) return -1; + + for (int i = 0; i < pWrapper->nCols; i++) { + SColCmpr *p = &pWrapper->pColCmpr[i]; + if (tDecodeI16v(pDecoder, &p->id) < 0) goto END; + if (tDecodeU32(pDecoder, &p->alg) < 0) goto END; + } + return 0; +END: + taosMemoryFree(pWrapper->pColCmpr); + return -1; +} int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { if (tStartEncode(pCoder) < 0) return -1; @@ -7541,6 +7566,8 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { } if (tEncodeI8(pCoder, pReq->source) < 0) return -1; + if (tEncodeSColCmprWrapper(pCoder, &pReq->colCmpr) < 0) return -1; + tEndEncode(pCoder); return 0; } @@ -7561,9 +7588,12 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { if (pReq->alterOriDataLen > 0) { if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1; } - if (!tDecodeIsEnd(pCoder)) { if (tDecodeI8(pCoder, &pReq->source) < 0) return -1; + + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeSColCmprWrapperEx(pCoder, &pReq->colCmpr) < 0) return -1; + } } tEndDecode(pCoder); @@ -7605,6 +7635,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { if (tEncodeI32(pCoder, pReq->sqlLen) < 0) return -1; if (tEncodeBinary(pCoder, pReq->sql, pReq->sqlLen) < 0) return -1; } + if (tEncodeSColCmprWrapper(pCoder, &pReq->colCmpr) < 0) return -1; tEndEncode(pCoder); return 0; @@ -7654,6 +7685,10 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { if (pReq->sqlLen > 0) { if (tDecodeBinaryAlloc(pCoder, (void **)&pReq->sql, NULL) < 0) return -1; } + + if (!tDecodeIsEnd(pCoder)) { + if (tDecodeSColCmprWrapperEx(pCoder, &pReq->colCmpr) < 0) return -1; + } } tEndDecode(pCoder); @@ -8477,7 +8512,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { for (int32_t i = 0; i < pRsp->blockNum; i++) { int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i); - void * data = taosArrayGetP(pRsp->blockData, i); + void *data = taosArrayGetP(pRsp->blockData, i); if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1; if (pRsp->withSchema) { SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); @@ -8515,7 +8550,7 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { } for (int32_t i = 0; i < pRsp->blockNum; i++) { - void * data; + void *data; uint64_t bLen; if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1; taosArrayPush(pRsp->blockData, &data); @@ -8569,7 +8604,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { for (int32_t i = 0; i < pRsp->createTableNum; i++) { - void * createTableReq = taosArrayGetP(pRsp->createTableReq, i); + void *createTableReq = taosArrayGetP(pRsp->createTableReq, i); int32_t createTableLen = *(int32_t *)taosArrayGet(pRsp->createTableLen, i); if (tEncodeBinary(pEncoder, createTableReq, createTableLen) < 0) return -1; } @@ -8578,14 +8613,14 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { } int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { - if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1; + if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp *)pRsp) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *)); for (int32_t i = 0; i < pRsp->createTableNum; i++) { - void * pCreate = NULL; + void *pCreate = NULL; uint64_t len; if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; int32_t l = (int32_t)len; @@ -8888,7 +8923,7 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { taosArrayDestroy(pTbData->aCol); } else { int32_t nRow = TARRAY_SIZE(pTbData->aRowP); - SRow ** rows = (SRow **)TARRAY_DATA(pTbData->aRowP); + SRow **rows = (SRow **)TARRAY_DATA(pTbData->aRowP); for (int32_t i = 0; i < nRow; ++i) { tRowDestroy(rows[i]); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index eeda3ef548..2b68e9574d 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -286,6 +286,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { if (sver < STB_VER_NUMBER) { // compatible with old data, setup default compress value // impl later + for (int i = 0; i < pStb->numOfColumns; i++) { + SCmprObj *pCmpr = &pStb->pCmpr[i]; + pCmpr->colId = 0; + pCmpr->cmprAlg = 0; + } } else { for (int i = 0; i < pStb->numOfColumns; i++) { SCmprObj *pCmpr = &pStb->pCmpr[i]; @@ -489,6 +494,15 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3 req.schemaTag.version = pStb->tagVer; req.schemaTag.pSchema = pStb->pTags; + SColCmprWrapper *pCmpr = &req.colCmpr; + pCmpr->version = pStb->colVer; + pCmpr->nCols = pStb->numOfColumns; + for (int i = 0; pStb->numOfColumns; i++) { + SColCmpr *p = &pCmpr->pColCmpr[i]; + p->alg = pStb->pCmpr[i].cmprAlg; + p->id = pStb->pCmpr[i].colId; + } + if (req.rollup) { req.rsmaParam.maxdelay[0] = pStb->maxdelay[0]; req.rsmaParam.maxdelay[1] = pStb->maxdelay[1]; @@ -870,10 +884,9 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->nextColId++; } - - pDst->pCmpr = taosMemoryCalloc(1, sizeof(SCmprObj)); + pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SCmprObj)); for (int32_t i = 0; i < pDst->numOfColumns; i++) { - SCmprObj *p = &pDst->pCmpr[i]; + SCmprObj *p = &pDst->pCmpr[i]; } return 0; } @@ -1255,13 +1268,16 @@ static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) { int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) { pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema)); pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema)); - if (pNew->pTags == NULL || pNew->pColumns == NULL) { + pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr)); + if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns); memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags); + memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns); + return 0; } @@ -1629,6 +1645,44 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj return 0; } +static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, char *colName, + SColCmpr *pColCmpr) { + if (pColCmpr == NULL || colName == NULL) return -1; + int32_t code = 0; + int32_t idx = mndFindSuperTableColumnIndex(pOld, colName); + if (idx == -1) { + terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST; + return -1; + } + col_id_t colId = pOld->pColumns[idx].colId; + if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { + return -1; + } + + if (mndAllocStbSchemas(pOld, pNew) != 0) { + return -1; + } + SSchema *pCols = pNew->pColumns + idx; + + int8_t updated = 0; + for (int i = 0; i < pNew->numOfColumns; i++) { + SCmprObj *p = &pNew->pCmpr[i]; + if (p->colId == colId) { + if (p->cmprAlg != pColCmpr->alg) { + p->cmprAlg = pColCmpr->alg; + } + break; + } + } + + if (updated == 0) { + terrno = TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST; + return -1; + } + + pNew->colVer++; + return 0; +} static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols) { if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) { terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS; @@ -2295,6 +2349,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p needRsp = false; code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl); break; + case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: + code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, 0, NULL); + break; default: needRsp = false; terrno = TSDB_CODE_OPS_NOT_SUPPORT; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 294e75602e..3d3e5e666f 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,7 +69,7 @@ int32_t vnodeBegin(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode); -int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); +int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList); @@ -134,8 +134,8 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList); int32_t metaPutTbGroupToCache(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, int32_t payloadLen); -bool metaTbInFilterCache(SMeta *pMeta, const void* key, int8_t type); -int32_t metaPutTbToFilterCache(SMeta *pMeta, const void* key, int8_t type); +bool metaTbInFilterCache(SMeta *pMeta, const void *key, int8_t type); +int32_t metaPutTbToFilterCache(SMeta *pMeta, const void *key, int8_t type); int32_t metaSizeOfTbFilterCache(SMeta *pMeta, int8_t type); int32_t metaInitTbFilterCache(SMeta *pMeta); @@ -172,13 +172,13 @@ void *tsdbGetIvtIdx2(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr); -void tsdbSetFilesetDelimited(STsdbReader* pReader); -void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param); +void tsdbSetFilesetDelimited(STsdbReader *pReader); +void tsdbReaderSetNotifyCb(STsdbReader *pReader, TsdReaderNotifyCbFn notifyFn, void *param); int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, - SArray* pFuncTypeList); + SArray *pFuncTypeList); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); @@ -234,7 +234,7 @@ int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, i bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); -int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, bool* fhFinished); +int32_t tqGetStreamExecInfo(SVnode *pVnode, int64_t streamId, int64_t *pDelay, bool *fhFinished); // sma int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); @@ -316,6 +316,10 @@ struct SVnodeCfg { #define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0) #define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON) +#define TABLE_COL_COMPRESSED ((int8_t)0x2) +#define TABLE_IS_COL_COMPRESSED(FLG) (((FLG) & (TABLE_COL_COMPRESSED)) != 0) +#define TABLE_SET_COL_COMPRESSED(FLG) ((FLG) |= TABLE_COL_COMPRESSED) + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 7dbaa66d44..ed17f0f591 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -160,10 +160,10 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_ STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey); // TODO, refactor later -int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *param, SArray *results); -int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *parm, SArray *pUids); -int32_t metaFilterTableName(void *pVnode, SMetaFltParam *param, SArray *pUids); -int32_t metaFilterTtl(void *pVnode, SMetaFltParam *param, SArray *pUids); +int32_t metaFilterTableIds(void* pVnode, SMetaFltParam* param, SArray* results); +int32_t metaFilterCreateTime(void* pVnode, SMetaFltParam* parm, SArray* pUids); +int32_t metaFilterTableName(void* pVnode, SMetaFltParam* param, SArray* pUids); +int32_t metaFilterTtl(void* pVnode, SMetaFltParam* param, SArray* pUids); #ifndef META_REFACT // SMetaDB diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index 01877a523a..48078961ab 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -15,6 +15,36 @@ #include "meta.h" +int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) { + const SColCmprWrapper *pw = &pME->colCmpr; + for (int32_t i = 0; i < pw->nCols; i++) { + SColCmpr *p = &pw->pColCmpr[i]; + if (tEncodeI16v(pCoder, p->id) < 0) return -1; + if (tEncodeU32(pCoder, p->alg) < 0) return -1; + } + return 0; +} +int meteDecodeColCmprEntry(SDecoder *pDecoder, SMetaEntry *pME) { + SColCmprWrapper *pWrapper = &pME->colCmpr; + if (tDecodeI32v(pDecoder, &pWrapper->nCols) < 0) return -1; + if (tDecodeI32v(pDecoder, &pWrapper->version) < 0) return -1; + + pWrapper->pColCmpr = (SColCmpr *)taosMemoryCalloc(1, pWrapper->nCols * sizeof(SColCmpr)); + if (pWrapper->pColCmpr == NULL) return -1; + + for (int i = 0; i < pWrapper->nCols; i++) { + SColCmpr *p = &pWrapper->pColCmpr[i]; + if (tDecodeI16v(pDecoder, &p->id) < 0) goto END; + if (tDecodeU32(pDecoder, &p->alg) < 0) goto END; + } + return 0; +END: + taosMemoryFree(pWrapper->pColCmpr); + return -1; +} + + + int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { if (tStartEncode(pCoder) < 0) return -1; @@ -56,6 +86,10 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { return -1; } + if (TABLE_IS_COL_COMPRESSED(pME->flags)) { + if (meteEncodeColCmprEntry(pCoder, pME) < 0) return -1; + } + tEndEncode(pCoder); return 0; } @@ -102,9 +136,18 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (tDecodeTSma(pCoder, pME->smaEntry.tsma, true) < 0) return -1; } else { metaError("meta/entry: invalide table type: %" PRId8 " decode failed.", pME->type); - return -1; } + if (!tDecodeIsEnd(pCoder)) { + if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1; + TABLE_SET_COL_COMPRESSED(pME->flags); + } else { + if (pME->type == TSDB_SUPER_TABLE) { + tInitDefaultSColCmprWrapper(&pME->colCmpr, &pME->stbEntry.schemaRow); + } else if (pME->type == TSDB_NORMAL_TABLE) { + tInitDefaultSColCmprWrapper(&pME->colCmpr, &pME->ntbEntry.schemaRow); + } + } tEndDecode(pCoder); return 0; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 4a978c8f41..96c7458b60 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -241,10 +241,20 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { me.name = pReq->name; me.stbEntry.schemaRow = pReq->schemaRow; me.stbEntry.schemaTag = pReq->schemaTag; + // me.stbEntry.colCmpr = pReq->colCmpr; + // me.stbEntry.colCmpr = pReq-> if (pReq->rollup) { TABLE_SET_ROLLUP(me.flags); me.stbEntry.rsmaParam = pReq->rsmaParam; } + if (pReq->colCmpred) { + TABLE_SET_COL_COMPRESSED(me.flags); + me.colCmpr = pReq->colCmpr; + } else { + TABLE_SET_COL_COMPRESSED(me.flags); + // TODO(yihao) + // SETUP default compress algr + } if (metaHandleEntry(pMeta, &me) < 0) goto _err; @@ -432,6 +442,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { nStbEntry.name = pReq->name; nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaTag = pReq->schemaTag; + nStbEntry.colCmpr = pReq->colCmpr; + TABLE_SET_COL_COMPRESSED(nStbEntry.flags); int nCols = pReq->schemaRow.nCols; int onCols = oStbEntry.stbEntry.schemaRow.nCols; @@ -636,6 +648,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { nStbEntry.name = pReq->name; nStbEntry.stbEntry.schemaRow = pReq->schemaRow; nStbEntry.stbEntry.schemaTag = pReq->schemaTag; + nStbEntry.colCmpr = pReq->colCmpr; metaWLock(pMeta); // update table.db @@ -769,12 +782,16 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) nStbEntry.uid = oStbEntry.uid; nStbEntry.name = oStbEntry.name; - SSchemaWrapper *row = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaRow); - SSchemaWrapper *tag = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaTag); + SSchemaWrapper *row = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaRow); + SSchemaWrapper *tag = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaTag); + SColCmprWrapper *cmpr = tCloneSColCmprWrapper(&oStbEntry.colCmpr); nStbEntry.stbEntry.schemaRow = *row; nStbEntry.stbEntry.schemaTag = *tag; nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam; + nStbEntry.colCmpr = *cmpr; + + nStbEntry.colCmpr = oStbEntry.colCmpr; metaWLock(pMeta); // update table.db @@ -785,6 +802,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq) tDeleteSchemaWrapper(tag); tDeleteSchemaWrapper(row); + tDeleteSColCmprWrapper(cmpr); if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); tDecoderClear(&dc); @@ -899,6 +917,8 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs me.ntbEntry.comment = pReq->comment; me.ntbEntry.schemaRow = pReq->ntb.schemaRow; me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1; + me.colCmpr = pReq->colCmpr; + TABLE_SET_COL_COMPRESSED(me.flags); ++pStats->numOfNTables; pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1; @@ -2457,6 +2477,9 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { VND_CHECK_CODE(code, line, _err); } + if (pME->type == TSDB_SUPER_TABLE || pME->type == TSDB_NORMAL_TABLE) { + } + metaULock(pMeta); metaDebug("vgId:%d, handle meta entry, ver:%" PRId64 ", uid:%" PRId64 ", name:%s", TD_VID(pMeta->pVnode), pME->version, pME->uid, pME->name); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 836fd980d0..e4a2a5ec85 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -215,6 +215,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes") TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST, "Column compress already exist") + // mnode-func TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name")