add new msg

This commit is contained in:
yihaoDeng 2024-03-06 12:06:26 +00:00
parent e084ee9d22
commit 63d3671f42
10 changed files with 294 additions and 82 deletions

View File

@ -165,6 +165,7 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME 10
#define TSDB_ALTER_TABLE_ADD_TAG_INDEX 11
#define TSDB_ALTER_TABLE_DROP_TAG_INDEX 12
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS 13
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
@ -640,6 +641,46 @@ typedef struct {
SSchema* pSchema;
} SSchemaWrapper;
typedef struct {
col_id_t id;
uint32_t alg;
} SColCmpr;
typedef struct {
int32_t nCols;
int32_t version;
SColCmpr* pColCmpr;
} SColCmprWrapper;
static FORCE_INLINE SColCmprWrapper* tCloneSColCmprWrapper(const SColCmprWrapper* pSrcWrapper) {
if (pSrcWrapper->pColCmpr == NULL || pSrcWrapper->nCols == 0) return NULL;
SColCmprWrapper* pDstWrapper = taosMemoryMalloc(pSrcWrapper->nCols * sizeof(SColCmpr));
pDstWrapper->nCols = pSrcWrapper->nCols;
pDstWrapper->version = pSrcWrapper->version;
int32_t size = sizeof(SColCmpr) * pDstWrapper->nCols;
pDstWrapper->pColCmpr = taosMemoryCalloc(1, size);
memcpy(pDstWrapper->pColCmpr, pSrcWrapper->pColCmpr, size);
return pDstWrapper;
}
static FORCE_INLINE void tInitDefaultSColCmprWrapper(SColCmprWrapper* pCmpr, SSchemaWrapper* pSchema) {
pCmpr->nCols = pSchema->nCols;
for (int32_t i = 0; i < pCmpr->nCols; i++) {
SColCmpr* pColCmpr = &pCmpr->pColCmpr[i];
SSchema* pColSchema = &pSchema->pSchema[i];
pColCmpr->id = pColSchema->colId;
pColCmpr->alg = 0;
}
}
static FORCE_INLINE void tDeleteSColCmprWrapper(SColCmprWrapper* pWrapper) {
if (pWrapper == NULL) return;
taosMemoryFreeClear(pWrapper->pColCmpr);
taosMemoryFreeClear(pWrapper);
}
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
if (pSchemaWrapper->pSchema == NULL) return NULL;
@ -2685,6 +2726,8 @@ typedef struct SVCreateStbReq {
int32_t alterOriDataLen;
void* alterOriData;
int8_t source;
int8_t colCmpred;
SColCmprWrapper colCmpr;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
@ -2724,6 +2767,7 @@ typedef struct SVCreateTbReq {
};
int32_t sqlLen;
char* sql;
SColCmprWrapper colCmpr;
} SVCreateTbReq;
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);

View File

@ -78,6 +78,8 @@ typedef struct SMetaEntry {
};
uint8_t* pBuf;
SColCmprWrapper colCmpr; // col compress alg
} SMetaEntry;
typedef struct SMetaReader {
@ -362,7 +364,8 @@ typedef struct SStateStore {
int32_t (*streamStateSessionAllocWinBuffByNextPosition)(SStreamState* pState, SStreamStateCur* pCur,
const SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen);
int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount,
void** ppVal, int32_t* pVLen);
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);

View File

@ -273,6 +273,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E)
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F)
#define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370)
#define TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0371)
// mnode-func

View File

@ -1470,9 +1470,7 @@ int32_t tDeserializeSStatisReq(void *buf, int32_t bufLen, SStatisReq *pReq) {
return 0;
}
void tFreeSStatisReq(SStatisReq *pReq) {
taosMemoryFreeClear(pReq->pCont);
}
void tFreeSStatisReq(SStatisReq *pReq) { taosMemoryFreeClear(pReq->pCont); }
int32_t tSerializeSCreateAcctReq(void *buf, int32_t bufLen, SCreateAcctReq *pReq) {
SEncoder encoder = {0};
@ -7523,6 +7521,33 @@ int32_t tDecodeSRSmaParam(SDecoder *pCoder, SRSmaParam *pRSmaParam) {
return 0;
}
int32_t tEncodeSColCmprWrapper(SEncoder *pCoder, const SColCmprWrapper *pWrapper) {
if (tEncodeI32v(pCoder, pWrapper->nCols) < 0) return -1;
if (tEncodeI32v(pCoder, pWrapper->version) < 0) return -1;
for (int32_t i = 0; i < pWrapper->nCols; i++) {
SColCmpr *p = &pWrapper->pColCmpr[i];
if (tEncodeI16v(pCoder, p->id) < 0) return -1;
if (tEncodeU32(pCoder, p->alg) < 0) return -1;
}
return 0;
}
int32_t tDecodeSColCmprWrapperEx(SDecoder *pDecoder, SColCmprWrapper *pWrapper) {
if (tDecodeI32v(pDecoder, &pWrapper->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pWrapper->version) < 0) return -1;
pWrapper->pColCmpr = (SColCmpr *)taosMemoryCalloc(1, pWrapper->nCols * sizeof(SColCmpr));
if (pWrapper->pColCmpr == NULL) return -1;
for (int i = 0; i < pWrapper->nCols; i++) {
SColCmpr *p = &pWrapper->pColCmpr[i];
if (tDecodeI16v(pDecoder, &p->id) < 0) goto END;
if (tDecodeU32(pDecoder, &p->alg) < 0) goto END;
}
return 0;
END:
taosMemoryFree(pWrapper->pColCmpr);
return -1;
}
int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
@ -7541,6 +7566,8 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
}
if (tEncodeI8(pCoder, pReq->source) < 0) return -1;
if (tEncodeSColCmprWrapper(pCoder, &pReq->colCmpr) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
@ -7561,9 +7588,12 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
if (pReq->alterOriDataLen > 0) {
if (tDecodeBinary(pCoder, (uint8_t **)&pReq->alterOriData, NULL) < 0) return -1;
}
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI8(pCoder, &pReq->source) < 0) return -1;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeSColCmprWrapperEx(pCoder, &pReq->colCmpr) < 0) return -1;
}
}
tEndDecode(pCoder);
@ -7605,6 +7635,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (tEncodeI32(pCoder, pReq->sqlLen) < 0) return -1;
if (tEncodeBinary(pCoder, pReq->sql, pReq->sqlLen) < 0) return -1;
}
if (tEncodeSColCmprWrapper(pCoder, &pReq->colCmpr) < 0) return -1;
tEndEncode(pCoder);
return 0;
@ -7654,6 +7685,10 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (pReq->sqlLen > 0) {
if (tDecodeBinaryAlloc(pCoder, (void **)&pReq->sql, NULL) < 0) return -1;
}
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeSColCmprWrapperEx(pCoder, &pReq->colCmpr) < 0) return -1;
}
}
tEndDecode(pCoder);

View File

@ -286,6 +286,11 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
if (sver < STB_VER_NUMBER) {
// compatible with old data, setup default compress value
// impl later
for (int i = 0; i < pStb->numOfColumns; i++) {
SCmprObj *pCmpr = &pStb->pCmpr[i];
pCmpr->colId = 0;
pCmpr->cmprAlg = 0;
}
} else {
for (int i = 0; i < pStb->numOfColumns; i++) {
SCmprObj *pCmpr = &pStb->pCmpr[i];
@ -489,6 +494,15 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
req.schemaTag.version = pStb->tagVer;
req.schemaTag.pSchema = pStb->pTags;
SColCmprWrapper *pCmpr = &req.colCmpr;
pCmpr->version = pStb->colVer;
pCmpr->nCols = pStb->numOfColumns;
for (int i = 0; pStb->numOfColumns; i++) {
SColCmpr *p = &pCmpr->pColCmpr[i];
p->alg = pStb->pCmpr[i].cmprAlg;
p->id = pStb->pCmpr[i].colId;
}
if (req.rollup) {
req.rsmaParam.maxdelay[0] = pStb->maxdelay[0];
req.rsmaParam.maxdelay[1] = pStb->maxdelay[1];
@ -870,8 +884,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pDst->nextColId++;
}
pDst->pCmpr = taosMemoryCalloc(1, sizeof(SCmprObj));
pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SCmprObj));
for (int32_t i = 0; i < pDst->numOfColumns; i++) {
SCmprObj *p = &pDst->pCmpr[i];
}
@ -1255,13 +1268,16 @@ static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
pNew->pTags = taosMemoryCalloc(pNew->numOfTags, sizeof(SSchema));
pNew->pColumns = taosMemoryCalloc(pNew->numOfColumns, sizeof(SSchema));
if (pNew->pTags == NULL || pNew->pColumns == NULL) {
pNew->pCmpr = taosMemoryCalloc(pNew->numOfColumns, sizeof(SColCmpr));
if (pNew->pTags == NULL || pNew->pColumns == NULL || pNew->pCmpr) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns);
return 0;
}
@ -1629,6 +1645,44 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj
return 0;
}
static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, char *colName,
SColCmpr *pColCmpr) {
if (pColCmpr == NULL || colName == NULL) return -1;
int32_t code = 0;
int32_t idx = mndFindSuperTableColumnIndex(pOld, colName);
if (idx == -1) {
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
return -1;
}
col_id_t colId = pOld->pColumns[idx].colId;
if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) {
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
SSchema *pCols = pNew->pColumns + idx;
int8_t updated = 0;
for (int i = 0; i < pNew->numOfColumns; i++) {
SCmprObj *p = &pNew->pCmpr[i];
if (p->colId == colId) {
if (p->cmprAlg != pColCmpr->alg) {
p->cmprAlg = pColCmpr->alg;
}
break;
}
}
if (updated == 0) {
terrno = TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST;
return -1;
}
pNew->colVer++;
return 0;
}
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols) {
if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
@ -2295,6 +2349,9 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
needRsp = false;
code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl);
break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS:
code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, 0, NULL);
break;
default:
needRsp = false;
terrno = TSDB_CODE_OPS_NOT_SUPPORT;

View File

@ -316,6 +316,10 @@ struct SVnodeCfg {
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON)
#define TABLE_COL_COMPRESSED ((int8_t)0x2)
#define TABLE_IS_COL_COMPRESSED(FLG) (((FLG) & (TABLE_COL_COMPRESSED)) != 0)
#define TABLE_SET_COL_COMPRESSED(FLG) ((FLG) |= TABLE_COL_COMPRESSED)
#ifdef __cplusplus
}
#endif

View File

@ -15,6 +15,36 @@
#include "meta.h"
int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) {
const SColCmprWrapper *pw = &pME->colCmpr;
for (int32_t i = 0; i < pw->nCols; i++) {
SColCmpr *p = &pw->pColCmpr[i];
if (tEncodeI16v(pCoder, p->id) < 0) return -1;
if (tEncodeU32(pCoder, p->alg) < 0) return -1;
}
return 0;
}
int meteDecodeColCmprEntry(SDecoder *pDecoder, SMetaEntry *pME) {
SColCmprWrapper *pWrapper = &pME->colCmpr;
if (tDecodeI32v(pDecoder, &pWrapper->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pWrapper->version) < 0) return -1;
pWrapper->pColCmpr = (SColCmpr *)taosMemoryCalloc(1, pWrapper->nCols * sizeof(SColCmpr));
if (pWrapper->pColCmpr == NULL) return -1;
for (int i = 0; i < pWrapper->nCols; i++) {
SColCmpr *p = &pWrapper->pColCmpr[i];
if (tDecodeI16v(pDecoder, &p->id) < 0) goto END;
if (tDecodeU32(pDecoder, &p->alg) < 0) goto END;
}
return 0;
END:
taosMemoryFree(pWrapper->pColCmpr);
return -1;
}
int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tStartEncode(pCoder) < 0) return -1;
@ -56,6 +86,10 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
return -1;
}
if (TABLE_IS_COL_COMPRESSED(pME->flags)) {
if (meteEncodeColCmprEntry(pCoder, pME) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
@ -102,9 +136,18 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeTSma(pCoder, pME->smaEntry.tsma, true) < 0) return -1;
} else {
metaError("meta/entry: invalide table type: %" PRId8 " decode failed.", pME->type);
return -1;
}
if (!tDecodeIsEnd(pCoder)) {
if (meteDecodeColCmprEntry(pCoder, pME) < 0) return -1;
TABLE_SET_COL_COMPRESSED(pME->flags);
} else {
if (pME->type == TSDB_SUPER_TABLE) {
tInitDefaultSColCmprWrapper(&pME->colCmpr, &pME->stbEntry.schemaRow);
} else if (pME->type == TSDB_NORMAL_TABLE) {
tInitDefaultSColCmprWrapper(&pME->colCmpr, &pME->ntbEntry.schemaRow);
}
}
tEndDecode(pCoder);
return 0;

View File

@ -241,10 +241,20 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
me.name = pReq->name;
me.stbEntry.schemaRow = pReq->schemaRow;
me.stbEntry.schemaTag = pReq->schemaTag;
// me.stbEntry.colCmpr = pReq->colCmpr;
// me.stbEntry.colCmpr = pReq->
if (pReq->rollup) {
TABLE_SET_ROLLUP(me.flags);
me.stbEntry.rsmaParam = pReq->rsmaParam;
}
if (pReq->colCmpred) {
TABLE_SET_COL_COMPRESSED(me.flags);
me.colCmpr = pReq->colCmpr;
} else {
TABLE_SET_COL_COMPRESSED(me.flags);
// TODO(yihao)
// SETUP default compress algr
}
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
@ -432,6 +442,8 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
nStbEntry.name = pReq->name;
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
nStbEntry.colCmpr = pReq->colCmpr;
TABLE_SET_COL_COMPRESSED(nStbEntry.flags);
int nCols = pReq->schemaRow.nCols;
int onCols = oStbEntry.stbEntry.schemaRow.nCols;
@ -636,6 +648,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
nStbEntry.name = pReq->name;
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
nStbEntry.colCmpr = pReq->colCmpr;
metaWLock(pMeta);
// update table.db
@ -771,10 +784,14 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
SSchemaWrapper *row = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaRow);
SSchemaWrapper *tag = tCloneSSchemaWrapper(&oStbEntry.stbEntry.schemaTag);
SColCmprWrapper *cmpr = tCloneSColCmprWrapper(&oStbEntry.colCmpr);
nStbEntry.stbEntry.schemaRow = *row;
nStbEntry.stbEntry.schemaTag = *tag;
nStbEntry.stbEntry.rsmaParam = oStbEntry.stbEntry.rsmaParam;
nStbEntry.colCmpr = *cmpr;
nStbEntry.colCmpr = oStbEntry.colCmpr;
metaWLock(pMeta);
// update table.db
@ -785,6 +802,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
tDeleteSchemaWrapper(tag);
tDeleteSchemaWrapper(row);
tDeleteSColCmprWrapper(cmpr);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
tDecoderClear(&dc);
@ -899,6 +917,8 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
me.ntbEntry.comment = pReq->comment;
me.ntbEntry.schemaRow = pReq->ntb.schemaRow;
me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1;
me.colCmpr = pReq->colCmpr;
TABLE_SET_COL_COMPRESSED(me.flags);
++pStats->numOfNTables;
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
@ -2457,6 +2477,9 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
VND_CHECK_CODE(code, line, _err);
}
if (pME->type == TSDB_SUPER_TABLE || pME->type == TSDB_NORMAL_TABLE) {
}
metaULock(pMeta);
metaDebug("vgId:%d, handle meta entry, ver:%" PRId64 ", uid:%" PRId64 ", name:%s", TD_VID(pMeta->pVnode),
pME->version, pME->uid, pME->name);

View File

@ -215,6 +215,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_COMPRESS_ALREADY_EXIST, "Column compress already exist")
// mnode-func
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name")