support input decimal

This commit is contained in:
wangjiaming0909 2024-12-19 08:46:51 +08:00
parent e63a0e277c
commit d4642d46db
28 changed files with 674 additions and 73 deletions

View File

@ -79,7 +79,7 @@ uint8_t columnLevelVal(const char* level);
uint8_t columnEncodeVal(const char* encode);
uint16_t columnCompressVal(const char* compress);
bool useCompress(uint8_t tableType);
bool withExtSchema(uint8_t tableType);
bool checkColumnEncode(char encode[TSDB_CL_COMPRESS_OPTION_LEN]);
bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OPTION_LEN]);
bool checkColumnCompress(char compress[TSDB_CL_COMPRESS_OPTION_LEN]);

View File

@ -252,6 +252,10 @@ struct SValue {
uint8_t *pData;
uint32_t nData;
};
struct {
DecimalWord *words;
int32_t wordNum;
};
};
};

View File

@ -599,6 +599,7 @@ struct SSchema {
struct SSchemaExt {
col_id_t colId;
uint32_t compress;
STypeMod typeMod;
};
//
@ -858,12 +859,14 @@ static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema)
static FORCE_INLINE int32_t tEncodeSSchemaExt(SEncoder* pEncoder, const SSchemaExt* pSchemaExt) {
TAOS_CHECK_RETURN(tEncodeI16v(pEncoder, pSchemaExt->colId));
TAOS_CHECK_RETURN(tEncodeU32(pEncoder, pSchemaExt->compress));
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pSchemaExt->typeMod));
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaExt(SDecoder* pDecoder, SSchemaExt* pSchemaExt) {
TAOS_CHECK_RETURN(tDecodeI16v(pDecoder, &pSchemaExt->colId));
TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pSchemaExt->compress));
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pSchemaExt->typeMod));
return 0;
}
@ -3255,7 +3258,7 @@ typedef struct SVCreateStbReq {
int8_t source;
int8_t colCmpred;
SColCmprWrapper colCmpr;
SExtSchema* pExtSchema;
SExtSchema* pExtSchemas;
} SVCreateStbReq;
int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq);
@ -3296,6 +3299,7 @@ typedef struct SVCreateTbReq {
int32_t sqlLen;
char* sql;
SColCmprWrapper colCmpr;
SExtSchema* pExtSchemas;
} SVCreateTbReq;
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
@ -3319,6 +3323,7 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear(req->ntb.schemaRow.pSchema);
}
taosMemoryFreeClear(req->colCmpr.pColCmpr);
taosMemoryFreeClear(req->pExtSchemas);
}
typedef struct {

View File

@ -373,7 +373,7 @@ void *getDataMin(int32_t type, void *value);
void *getDataMax(int32_t type, void *value);
#define STypeMod int32_t
uint8_t getDecimalType(uint8_t precision);
uint8_t decimalTypeFromPrecision(uint8_t precision);
#ifdef __cplusplus
}

View File

@ -20,10 +20,45 @@
extern "C" {
#endif
#include "ttypes.h"
#include "tdef.h"
typedef struct SDataType SDataType;
typedef struct SValue SValue;
typedef struct Decimal64 {
DecimalWord words[1];
} Decimal64;
typedef struct Decimal128 {
DecimalWord words[2];
} Decimal128;
#define DECIMAL_WORD_NUM(TYPE) sizeof(TYPE) / sizeof(DecimalWord)
int32_t decimalCalcTypeMod(const SDataType* pType);
void decimalFromTypeMod(STypeMod typeMod, uint8_t* precision, uint8_t* scale);
int32_t decimal64FromStr(const char* str, int32_t len, uint8_t* precision, uint8_t* scale, Decimal64* result);
int32_t decimal128FromStr(const char* str, int32_t len, uint8_t* precision, uint8_t* scale, Decimal128* result);
int32_t decimal64ToDataVal(const Decimal64* dec, SValue* pVal);
int32_t decimal128ToDataVal(const Decimal128* dec, SValue* pVal);
typedef struct DecimalVar DecimalVar;
typedef struct SDecimalVarOps {
uint8_t wordNum;
int32_t (*add)(DecimalVar* pLeft, const DecimalVar* pRight);
int32_t (*multiply)(DecimalVar* pLeft, const DecimalVar* pRight);
} SDecimalVarOps;
typedef struct SWideIntegerOps {
uint8_t wordNum;
int32_t (*add)(DecimalWord* pLeft, DecimalWord* pRight, uint8_t rightWordNum);
int32_t (*subtract)(DecimalWord* pLeft, DecimalWord* pRight, uint8_t rightWordNum);
int32_t (*multiply)(DecimalWord* pLeft, DecimalWord* pRight, uint8_t rightWordNum);
int32_t (*divide)(DecimalWord* pLeft, DecimalWord* pRight, uint8_t rightWordNum);
} SWideIntegerOps;
#ifdef __cplusplus
}

View File

@ -83,7 +83,7 @@ typedef struct SMetaEntry {
uint8_t* pBuf;
SColCmprWrapper colCmpr; // col compress alg
SExtSchema* pExtSchema;
SExtSchema* pExtSchemas;
} SMetaEntry;
typedef struct SMetaReader {

View File

@ -134,6 +134,7 @@ typedef struct SViewMeta {
int32_t version;
int32_t numOfCols;
SSchema* pSchema;
// TODO wjm view support decimal
} SViewMeta;
typedef struct SDBVgInfo {

View File

@ -682,13 +682,19 @@ typedef enum {
#define MIN_RESERVE_MEM_SIZE 1024 // MB
// Decimal
#define TSDB_DECIMAL_MIN_PRECISION 1
#define TSDB_DECIMAL_MAX_PRECISION 38
#define TSDB_DECIMAL_MAX_SCALE TSDB_DECIMAL_MAX_PRECISION
#define TSDB_DECIMAL64_MAX_PRECISION 18
#define TSDB_DECIMAL64_MAX_SCALE TSDB_DECIMAL64_MAX_PRECISION
#define TSDB_DECIMAL128_MAX_PRECISION 38
#define TSDB_DECIMAL128_MAX_SCALE TSDB_DECIMAL128_MAX_PRECISION
#define TSDB_DECIMAL_MIN_PRECISION 1
#define TSDB_DECIMAL_MAX_PRECISION TSDB_DECIMAL128_MAX_PRECISION
#define TSDB_DECIMAL_MIN_SCALE 0
#define TSDB_DECIMAL_MAX_SCALE TSDB_DECIMAL_MAX_PRECISION
typedef uint64_t DecimalWord;
#ifdef __cplusplus
}
#endif

View File

@ -3911,7 +3911,7 @@ int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp) {
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->tagsLen));
TAOS_CHECK_EXIT(tEncodeBinary(&encoder, pRsp->pTags, pRsp->tagsLen));
if (useCompress(pRsp->tableType)) {
if (withExtSchema(pRsp->tableType)) {
for (int32_t i = 0; i < pRsp->numOfColumns; ++i) {
SSchemaExt *pSchemaExt = &pRsp->pSchemaExt[i];
TAOS_CHECK_EXIT(tEncodeSSchemaExt(&encoder, pSchemaExt));
@ -3987,7 +3987,7 @@ int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp)
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(&decoder, (void **)&pRsp->pTags, NULL));
if (!tDecodeIsEnd(&decoder)) {
if (useCompress(pRsp->tableType) && pRsp->numOfColumns > 0) {
if (withExtSchema(pRsp->tableType) && pRsp->numOfColumns > 0) {
pRsp->pSchemaExt = taosMemoryMalloc(sizeof(SSchemaExt) * pRsp->numOfColumns);
if (pRsp->pSchemaExt == NULL) {
TAOS_CHECK_EXIT(terrno);
@ -6082,7 +6082,7 @@ static int32_t tEncodeSTableMetaRsp(SEncoder *pEncoder, STableMetaRsp *pRsp) {
TAOS_CHECK_RETURN(tEncodeSSchema(pEncoder, pSchema));
}
if (useCompress(pRsp->tableType)) {
if (withExtSchema(pRsp->tableType)) {
for (int32_t i = 0; i < pRsp->numOfColumns; ++i) {
SSchemaExt *pSchemaExt = &pRsp->pSchemaExt[i];
TAOS_CHECK_RETURN(tEncodeSSchemaExt(pEncoder, pSchemaExt));
@ -6123,7 +6123,7 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
}
if (!tDecodeIsEnd(pDecoder)) {
if (useCompress(pRsp->tableType) && pRsp->numOfColumns > 0) {
if (withExtSchema(pRsp->tableType) && pRsp->numOfColumns > 0) {
pRsp->pSchemaExt = taosMemoryMalloc(sizeof(SSchemaExt) * pRsp->numOfColumns);
if (pRsp->pSchemaExt == NULL) {
TAOS_CHECK_RETURN(terrno);
@ -10495,9 +10495,9 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(pCoder, pReq->colCmpred));
TAOS_CHECK_EXIT(tEncodeSColCmprWrapper(pCoder, &pReq->colCmpr));
if (pReq->pExtSchema) {
if (pReq->pExtSchemas) {
TAOS_CHECK_EXIT(tEncodeI8(pCoder, 1));
TAOS_CHECK_EXIT(tEncodeSExtSchemas(pCoder, pReq->pExtSchema, pReq->schemaRow.nCols));
TAOS_CHECK_EXIT(tEncodeSExtSchemas(pCoder, pReq->pExtSchemas, pReq->schemaRow.nCols));
} else {
TAOS_CHECK_EXIT(tEncodeI8(pCoder, 0));
}
@ -10540,7 +10540,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
int8_t hasExtSchema = 0;
TAOS_CHECK_EXIT(tDecodeI8(pCoder, &hasExtSchema));
if (hasExtSchema) {
TAOS_CHECK_EXIT(tDecodeSExtSchemas(pCoder, &pReq->pExtSchema, pReq->schemaRow.nCols));
TAOS_CHECK_EXIT(tDecodeSExtSchemas(pCoder, &pReq->pExtSchemas, pReq->schemaRow.nCols));
}
}
}
@ -10592,6 +10592,12 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
// Encode Column Options: encode compress level
if (pReq->type == TSDB_SUPER_TABLE || pReq->type == TSDB_NORMAL_TABLE) {
TAOS_CHECK_EXIT(tEncodeSColCmprWrapper(pCoder, &pReq->colCmpr));
if (pReq->pExtSchemas) {
TAOS_CHECK_EXIT(tEncodeI8(pCoder, 1));
TAOS_CHECK_EXIT(tEncodeSExtSchemas(pCoder, pReq->pExtSchemas, pReq->ntb.schemaRow.nCols));
} else {
TAOS_CHECK_EXIT(tEncodeI8(pCoder, 0));
}
}
tEndEncode(pCoder);
@ -10656,6 +10662,14 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (!tDecodeIsEnd(pCoder)) {
TAOS_CHECK_EXIT(tDecodeSColCmprWrapperEx(pCoder, &pReq->colCmpr));
}
if (!tDecodeIsEnd(pCoder)) {
int8_t hasExtSchema = 0;
TAOS_CHECK_EXIT(tDecodeI8(pCoder, &hasExtSchema));
if (hasExtSchema) {
TAOS_CHECK_EXIT(tDecodeSExtSchemas(pCoder, &pReq->pExtSchemas, pReq->ntb.schemaRow.nCols));
}
}
}
tEndDecode(pCoder);

View File

@ -350,7 +350,7 @@ int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressTy
return TSDB_CODE_SUCCESS;
}
bool useCompress(uint8_t tableType) {
bool withExtSchema(uint8_t tableType) {
return TSDB_SUPER_TABLE == tableType || TSDB_NORMAL_TABLE == tableType || TSDB_CHILD_TABLE == tableType;
}

View File

@ -233,6 +233,6 @@ int32_t operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type) {
return 0;
}
uint8_t getDecimalType(uint8_t precision) {
uint8_t decimalTypeFromPrecision(uint8_t precision) {
return precision > TSDB_DECIMAL64_MAX_PRECISION ? TSDB_DATA_TYPE_DECIMAL : TSDB_DATA_TYPE_DECIMAL64;
}

View File

@ -579,7 +579,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int3
}
}
}
req.pExtSchema = pStb->pExtSchemas; // only reference to it.
req.pExtSchemas = pStb->pExtSchemas; // only reference to it.
// get length
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateStbReq, &req, contLen, ret);
@ -2232,6 +2232,9 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
SSchemaExt *pSchEx = &pRsp->pSchemaExt[i];
pSchEx->colId = pCmpr->id;
pSchEx->compress = pCmpr->alg;
if (pStb->pExtSchemas) {
pSchEx->typeMod = pStb->pExtSchemas[i].typeMod;
}
}
taosRUnLockLatch(&pStb->lock);
@ -2297,6 +2300,9 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
SSchemaExt *pSchExt = &pRsp->pSchemaExt[i];
pSchExt->colId = pCmpr->id;
pSchExt->compress = pCmpr->alg;
if (pStb->pExtSchemas) {
pSchExt->typeMod = pStb->pExtSchemas[i].typeMod;
}
}
taosRUnLockLatch(&pStb->lock);

View File

@ -25,7 +25,7 @@ static bool schemasHasTypeMod(const SSchema *pSchema, int32_t nCols) {
}
static int32_t metaEncodeExtSchema(SEncoder* pCoder, const SMetaEntry* pME) {
if (pME->pExtSchema) {
if (pME->pExtSchemas) {
const SSchemaWrapper *pSchWrapper = NULL;
bool hasTypeMods = false;
if (pME->type == TSDB_SUPER_TABLE) {
@ -38,7 +38,7 @@ static int32_t metaEncodeExtSchema(SEncoder* pCoder, const SMetaEntry* pME) {
hasTypeMods = schemasHasTypeMod(pSchWrapper->pSchema, pSchWrapper->nCols);
for (int32_t i = 0; i < pSchWrapper->nCols && hasTypeMods; ++i) {
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pME->pExtSchema[i].typeMod));
TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pME->pExtSchemas[i].typeMod));
}
}
return 0;
@ -57,13 +57,13 @@ static int32_t metaDecodeExtSchemas(SDecoder* pDecoder, SMetaEntry* pME) {
hasExtSchema = schemasHasTypeMod(pSchWrapper->pSchema, pSchWrapper->nCols);
if (hasExtSchema && pSchWrapper->nCols > 0) {
pME->pExtSchema = (SExtSchema*)tDecoderMalloc(pDecoder, sizeof(SExtSchema) * pSchWrapper->nCols);
if (pME->pExtSchema == NULL) {
pME->pExtSchemas = (SExtSchema*)tDecoderMalloc(pDecoder, sizeof(SExtSchema) * pSchWrapper->nCols);
if (pME->pExtSchemas == NULL) {
return terrno;
}
for (int32_t i = 0; i < pSchWrapper->nCols && hasExtSchema; i++) {
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pME->pExtSchema[i].typeMod));
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pME->pExtSchemas[i].typeMod));
}
}

View File

@ -811,7 +811,7 @@ int32_t metaGetColCmpr(SMeta *pMeta, tb_uid_t uid, SHashObj **ppColCmprObj) {
taosHashClear(pColCmprObj);
return rc;
}
if (useCompress(e.type)) {
if (withExtSchema(e.type)) {
SColCmprWrapper *p = &e.colCmpr;
for (int32_t i = 0; i < p->nCols; i++) {
SColCmpr *pCmpr = &p->pColCmpr[i];

View File

@ -189,7 +189,7 @@ int32_t metaCreateSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq
TABLE_SET_COL_COMPRESSED(entry.flags);
entry.colCmpr = pReq->colCmpr;
}
entry.pExtSchema = pReq->pExtSchema;
entry.pExtSchemas = pReq->pExtSchemas;
code = metaHandleEntry2(pMeta, &entry);
if (TSDB_CODE_SUCCESS == code) {
@ -436,6 +436,9 @@ static int32_t metaBuildCreateNormalTableRsp(SMeta *pMeta, SMetaEntry *pEntry, S
SColCmpr *p = &pEntry->colCmpr.pColCmpr[i];
(*ppRsp)->pSchemaExt[i].colId = p->id;
(*ppRsp)->pSchemaExt[i].compress = p->alg;
if (pEntry->pExtSchemas) {
(*ppRsp)->pSchemaExt[i].typeMod = pEntry->pExtSchemas[i].typeMod;
}
}
return code;
@ -455,17 +458,18 @@ static int32_t metaCreateNormalTable(SMeta *pMeta, int64_t version, SVCreateTbRe
}
SMetaEntry entry = {
.version = version,
.type = TSDB_NORMAL_TABLE,
.uid = pReq->uid,
.name = pReq->name,
.ntbEntry.btime = pReq->btime,
.ntbEntry.ttlDays = pReq->ttl,
.ntbEntry.commentLen = pReq->commentLen,
.ntbEntry.comment = pReq->comment,
.ntbEntry.schemaRow = pReq->ntb.schemaRow,
.ntbEntry.ncid = pReq->ntb.schemaRow.pSchema[pReq->ntb.schemaRow.nCols - 1].colId + 1,
.colCmpr = pReq->colCmpr,
.version = version,
.type = TSDB_NORMAL_TABLE,
.uid = pReq->uid,
.name = pReq->name,
.ntbEntry.btime = pReq->btime,
.ntbEntry.ttlDays = pReq->ttl,
.ntbEntry.commentLen = pReq->commentLen,
.ntbEntry.comment = pReq->comment,
.ntbEntry.schemaRow = pReq->ntb.schemaRow,
.ntbEntry.ncid = pReq->ntb.schemaRow.pSchema[pReq->ntb.schemaRow.nCols - 1].colId + 1,
.colCmpr = pReq->colCmpr,
.pExtSchemas = pReq->pExtSchemas,
};
TABLE_SET_COL_COMPRESSED(entry.flags);
@ -690,6 +694,8 @@ int32_t metaAddTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq, ST
SColCmpr *p = &pEntry->colCmpr.pColCmpr[i];
pRsp->pSchemaExt[i].colId = p->id;
pRsp->pSchemaExt[i].compress = p->alg;
// TODO wjm
// if (pEntry->pExtSchemas) pRsp->pSchemaExt[i].typeMod = pEntry->pExtSchemas[i].typeMod;
}
}

View File

@ -34,7 +34,7 @@ void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery);
int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol) {
int8_t tblType = reader->me.type;
if (useCompress(tblType)) {
if (withExtSchema(tblType)) {
SColCmprWrapper *p = &(reader->me.colCmpr);
if (numOfCol != p->nCols) {
vError("fillTableColCmpr table type:%d, col num:%d, col cmpr num:%d mismatch", tblType, numOfCol, p->nCols);
@ -180,6 +180,9 @@ int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
if (code < 0) {
goto _exit;
}
for (int32_t i = 0; i < metaRsp.numOfColumns && pReader->me.pExtSchemas; i++) {
metaRsp.pSchemaExt[i].typeMod = pReader->me.pExtSchemas[i].typeMod;
}
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
@ -348,6 +351,10 @@ int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
SSchemaExt *pSchExt = cfgRsp.pSchemaExt + i;
pSchExt->colId = pCmpr->id;
pSchExt->compress = pCmpr->alg;
if (pReader->me.pExtSchemas)
pSchExt->typeMod = pReader->me.pExtSchemas[i].typeMod;
else
pSchExt->typeMod = 0;
}
//}

View File

@ -1684,7 +1684,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
if (output->tbMeta) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
int32_t schemaExtSize = 0;
if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) {
if (withExtSchema(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) {
schemaExtSize = output->tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt);
}
@ -1697,7 +1697,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
}
TAOS_MEMCPY((*pOutput)->tbMeta, output->tbMeta, metaSize);
if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) {
if (withExtSchema(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) {
(*pOutput)->tbMeta->schemaExt = (SSchemaExt*)((char*)(*pOutput)->tbMeta + metaSize);
TAOS_MEMCPY((*pOutput)->tbMeta->schemaExt, output->tbMeta->schemaExt, schemaExtSize);
} else {

View File

@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries(
command
PRIVATE os util nodes catalog function transport qcom scheduler
PRIVATE os util nodes catalog function transport qcom scheduler decimal
)
if(${BUILD_TEST})

View File

@ -16,6 +16,7 @@
#include "catalog.h"
#include "command.h"
#include "commandInt.h"
#include "decimal.h"
#include "scheduler.h"
#include "systable.h"
#include "taosdef.h"
@ -150,7 +151,7 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
SColumnInfoData* pCol6 = NULL;
// level
SColumnInfoData* pCol7 = NULL;
if (useCompress(pMeta->tableType)) {
if (withExtSchema(pMeta->tableType)) {
pCol5 = taosArrayGet(pBlock->pDataBlock, 4);
pCol6 = taosArrayGet(pBlock->pDataBlock, 5);
pCol7 = taosArrayGet(pBlock->pDataBlock, 6);
@ -182,7 +183,7 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
STR_TO_VARSTR(buf, "VIEW COL");
}
COL_DATA_SET_VAL_AND_CHECK(pCol4, pBlock->info.rows, buf, false);
if (useCompress(pMeta->tableType) && pMeta->schemaExt) {
if (withExtSchema(pMeta->tableType) && pMeta->schemaExt) {
if (i < pMeta->tableInfo.numOfColumns) {
STR_TO_VARSTR(buf, columnEncodeStr(COMPRESS_L1_TYPE_U32(pMeta->schemaExt[i].compress)));
COL_DATA_SET_VAL_AND_CHECK(pCol5, pBlock->info.rows, buf, false);
@ -235,7 +236,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta, biMode);
}
if (TSDB_CODE_SUCCESS == code) {
if (pDesc->pMeta && useCompress(pDesc->pMeta->tableType) && pDesc->pMeta->schemaExt) {
if (pDesc->pMeta && withExtSchema(pDesc->pMeta->tableType) && pDesc->pMeta->schemaExt) {
code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS_COMPRESS, pRsp);
} else {
code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp);
@ -537,9 +538,13 @@ static void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
typeLen += tsnprintf(type + typeLen, LTYPE_LEN - typeLen, "(%d)",
(int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
} else if (IS_DECIMAL_TYPE(pSchema->type)) {
uint8_t precision, scale;
decimalFromTypeMod(pCfg->pSchemaExt[i].typeMod, &precision, &scale);
typeLen += tsnprintf(type + typeLen, LTYPE_LEN - typeLen, "(%d,%d)", precision, scale);
}
if (useCompress(pCfg->tableType) && pCfg->pSchemaExt) {
if (withExtSchema(pCfg->tableType) && pCfg->pSchemaExt) {
typeLen += tsnprintf(type + typeLen, LTYPE_LEN - typeLen, " ENCODE \'%s\'",
columnEncodeStr(COMPRESS_L1_TYPE_U32(pCfg->pSchemaExt[i].compress)));
typeLen += tsnprintf(type + typeLen, LTYPE_LEN - typeLen, " COMPRESS \'%s\'",

View File

@ -17,6 +17,42 @@
#include "decimal.h"
#include "querynodes.h"
typedef enum DecimalInternalType {
DECIMAL_64 = 0,
DECIMAL_128 = 1,
} DecimalInternalType;
SWideIntegerOps wideIntegerOps [2] = {
{DECIMAL_WORD_NUM(Decimal64), 0, 0, 0},
{DECIMAL_WORD_NUM(Decimal128), 0, 0, 0}};
SDecimalVarOps decimalVarOps[2] = {
{DECIMAL_WORD_NUM(Decimal64), 0, 0},
{DECIMAL_WORD_NUM(Decimal128), 0, 0}
};
struct DecimalVar {
DecimalInternalType type;
uint8_t precision;
uint8_t scale;
int32_t exponent;
int8_t sign;
DecimalWord* words;
};
uint8_t maxPrecision(DecimalInternalType type) {
switch (type) {
case DECIMAL_64:
return TSDB_DECIMAL64_MAX_PRECISION;
case DECIMAL_128:
return TSDB_DECIMAL128_MAX_PRECISION;
default:
return 0;
}
}
static int32_t decimalVarFromStr(const char* str, int32_t len, DecimalVar* result);
int32_t decimalCalcTypeMod(const SDataType* pType) {
if (IS_DECIMAL_TYPE(pType->type)) {
return (pType->precision << 8) + pType->scale;
@ -24,4 +60,128 @@ int32_t decimalCalcTypeMod(const SDataType* pType) {
return 0;
}
void decimalFromTypeMod(STypeMod typeMod, uint8_t* precision, uint8_t* scale) {
*precision = (uint8_t)((typeMod >> 8) & 0xFF);
*scale = (uint8_t)(typeMod & 0xFF);
}
#define DECIMAL_INTERNAL_TYPE(TYPE) DECIMAL_WORD_NUM(TYPE) - 1
int32_t decimal64FromStr(const char* str, int32_t len, uint8_t* precision, uint8_t* scale, Decimal64* result) {
int32_t code = 0;
DecimalVar var = {.type = DECIMAL_INTERNAL_TYPE(Decimal64), .words = result->words};
code = decimalVarFromStr(str, len, &var);
return code;
}
int32_t decimal128FromStr(const char* str, int32_t len, uint8_t* precision, uint8_t* scale, Decimal128* result) {
int32_t code = 0;
DecimalVar var = {.type = DECIMAL_INTERNAL_TYPE(Decimal128), .words = result->words};
return code;
}
static int32_t decimalVarFromStr(const char* str, int32_t len, DecimalVar* result) {
int32_t code = 0, pos = 0;
result->precision = 0;
result->scale = 0;
bool leadingZeroes = true, afterPoint = false;
uint32_t places = 0;
if (len == 0) return TSDB_CODE_INVALID_DATA_FMT;
SWideIntegerOps ops =
wideIntegerOps[result->type]; // TODO wjm currently, the meaning from type to index is not clear
SDecimalVarOps decOps = decimalVarOps[result->type];
// sign
switch (str[pos]) {
case '-':
result->sign = -1;
case '+':
result->sign = 1;
pos++;
default:
break;
}
for (; pos < len; ++pos) {
switch (str[pos]) {
case '.':
afterPoint = true;
leadingZeroes = false;
break;
case '0':
if (leadingZeroes) break;
if (afterPoint) {
places++;
break;
}
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9': {
leadingZeroes = false;
++places;
if (result->precision + places > maxPrecision(result->type)) {
if (afterPoint) {
break;
} else {
// TODO wjm value too large
return TSDB_CODE_INVALID_DATA_FMT;
}
} else {
result->precision += places;
if (afterPoint) {
result->scale++;
}
DecimalWord ten = 10, digit = str[pos] - '0';
ops.multiply(result->words, &ten, 1);
ops.add(result->words, &digit, 1);
places = 0;
break;
}
}
case 'e':
case 'E':
break;
default:
break;
}
++pos;
}
DecimalWord sign = result->sign;
ops.multiply(result->words, &sign, 1);
return code;
}
int32_t varMultiply(DecimalVar* pLeft, const DecimalVar* pRight) {
int32_t code = 0;
return code;
}
int32_t varAdd(DecimalVar* pLeft, const DecimalVar* pRight) {
int32_t code = 0;
return code;
}
int32_t decimal64ToDataVal(const Decimal64* dec, SValue* pVal) {
pVal->words = taosMemoryCalloc(1, sizeof(DecimalWord));
if (!pVal->words) return terrno;
pVal->wordNum = DECIMAL_WORD_NUM(Decimal64);
pVal->words[0] = *dec->words;
return TSDB_CODE_SUCCESS;
}
int32_t decimal128ToDataVal(const Decimal128* dec, SValue* pVal) {
pVal->words = taosMemCalloc(2, sizeof(DecimalWord));
if (!pVal->words) return terrno;
pVal->wordNum = DECIMAL_WORD_NUM(Decimal128);
memcpy(pVal->words, dec->words, sizeof(dec->words));
return TSDB_CODE_SUCCESS;
}

View File

@ -2491,7 +2491,7 @@ SDataType createDecimalDataType(uint8_t type, const SToken* pPrecisionToken, con
SDataType dt = {0};
dt.precision = taosStr2UInt8(pPrecisionToken->z, NULL, 10);
dt.scale = pScaleToken ? taosStr2Int32(pScaleToken->z, NULL, 10) : 0;
dt.type = getDecimalType(dt.precision);
dt.type = decimalTypeFromPrecision(dt.precision);
dt.bytes = tDataTypes[dt.type].bytes;
return dt;
}

View File

@ -19,6 +19,7 @@
#include "scalar.h"
#include "tglobal.h"
#include "ttime.h"
#include "decimal.h"
typedef struct SInsertParseContext {
SParseContext* pComCxt;
@ -1777,6 +1778,38 @@ static int32_t parseValueTokenImpl(SInsertParseContext* pCxt, const char** pSql,
}
break;
}
case TSDB_DATA_TYPE_DECIMAL: {
uint8_t precision = 0, scale = 0;
Decimal128 dec = {0};
int32_t code = decimal128FromStr(pToken->z, pToken->n, &precision, &scale, &dec);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
// precision check
// scale auto fit
code = decimal128ToDataVal(&dec, &pVal->value);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
break;
}
case TSDB_DATA_TYPE_DECIMAL64: {
uint8_t precision = 0, scale = 0;
Decimal64 dec = {0};
int32_t code = decimal64FromStr(pToken->z, pToken->n, &precision, &scale, &dec);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
// precision check
// scale auto fit
code = decimal64ToDataVal(&dec, &pVal->value);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
break;
}
default:
return TSDB_CODE_FAILED;
}

View File

@ -33,6 +33,7 @@
#include "tcol.h"
#include "tglobal.h"
#include "ttime.h"
#include "decimal.h"
#define generateDealNodeErrMsg(pCxt, code, ...) \
(pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, code, ##__VA_ARGS__), DEAL_RES_ERROR)
@ -612,7 +613,7 @@ static int32_t rewriteDropTableWithMetaCache(STranslateContext* pCxt) {
int32_t metaSize =
sizeof(STableMeta) + sizeof(SSchema) * (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags);
int32_t schemaExtSize =
(useCompress(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0;
(withExtSchema(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0;
const char* pTbName = (const char*)pMeta + metaSize + schemaExtSize;
SName name = {0};
@ -1357,7 +1358,7 @@ static bool hasPkInTable(const STableMeta* pTableMeta) {
}
static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* pColSchema, int32_t tagFlag,
SColumnNode* pCol) {
SColumnNode* pCol, const SSchemaExt* pExtSchema) {
tstrncpy(pCol->dbName, pTable->table.dbName, TSDB_DB_NAME_LEN);
tstrncpy(pCol->tableAlias, pTable->table.tableAlias, TSDB_TABLE_NAME_LEN);
tstrncpy(pCol->tableName, pTable->table.tableName, TSDB_TABLE_NAME_LEN);
@ -1381,6 +1382,12 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
pCol->tableHasPk = hasPkInTable(pTable->pMeta);
pCol->isPk = (pCol->tableHasPk) && (pColSchema->flags & COL_IS_KEY);
pCol->numOfPKs = pTable->pMeta->tableInfo.numOfPKs;
if (pExtSchema) {
if (IS_DECIMAL_TYPE(pCol->node.resType.type)) {
decimalFromTypeMod(pExtSchema->typeMod, &pCol->node.resType.precision, &pCol->node.resType.scale);
}
}
}
static int32_t setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) {
@ -1477,7 +1484,8 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p
if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(&pCxt->msgBuf, code);
}
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol);
SSchemaExt* pSchemaExt = i > pMeta->tableInfo.numOfColumns ? NULL : pMeta->schemaExt + i;
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol, pSchemaExt);
setColumnPrimTs(pCxt, pCol, pTable);
code = nodesListStrictAppend(pList, (SNode*)pCol);
}
@ -1545,7 +1553,7 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef,
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pCol->colName);
}
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, -1, pCol);
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, -1, pCol, NULL);
pCol->isPrimTs = true;
*pFound = true;
return TSDB_CODE_SUCCESS;
@ -1554,7 +1562,9 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef,
for (int32_t i = 0; i < nums; ++i) {
if (0 == strcmp(pCol->colName, pMeta->schema[i].name) &&
!invisibleColumn(pCxt->pParseCxt->enableSysInfo, pMeta->tableType, pMeta->schema[i].flags)) {
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol);
SSchemaExt* pSchemaExt = i > pMeta->tableInfo.numOfColumns ? NULL : pMeta->schemaExt + i;
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol, pSchemaExt);
setColumnPrimTs(pCxt, pCol, pTable);
*pFound = true;
break;
@ -2437,7 +2447,7 @@ static int32_t rewriteCountStar(STranslateContext* pCxt, SFunctionNode* pCount)
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pTable && QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol);
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol, NULL);
} else {
code = rewriteCountStarAsCount1(pCxt, pCount);
}
@ -2463,7 +2473,7 @@ static int32_t rewriteCountNotNullValue(STranslateContext* pCxt, SFunctionNode*
SColumnNode* pCol = NULL;
code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
if (TSDB_CODE_SUCCESS == code) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol);
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol, NULL);
NODES_DESTORY_LIST(pCount->pParameterList);
code = nodesListMakeAppend(&pCount->pParameterList, (SNode*)pCol);
}
@ -2492,7 +2502,7 @@ static int32_t rewriteCountTbname(STranslateContext* pCxt, SFunctionNode* pCount
SColumnNode* pCol = NULL;
code = nodesMakeNode(QUERY_NODE_COLUMN, (SNode**)&pCol);
if (TSDB_CODE_SUCCESS == code) {
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol);
setColumnInfoBySchema((SRealTableNode*)pTable, ((SRealTableNode*)pTable)->pMeta->schema, -1, pCol, NULL);
NODES_DESTORY_LIST(pCount->pParameterList);
code = nodesListMakeAppend(&pCount->pParameterList, (SNode*)pCol);
}
@ -5361,7 +5371,7 @@ static int32_t createTags(STranslateContext* pCxt, SNodeList** pOutput) {
if (TSDB_CODE_SUCCESS != code) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
}
setColumnInfoBySchema(pTable, pTagsSchema + i, 1, pCol);
setColumnInfoBySchema(pTable, pTagsSchema + i, 1, pCol, NULL);
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(pOutput, (SNode*)pCol)) {
NODES_DESTORY_LIST(*pOutput);
return TSDB_CODE_OUT_OF_MEMORY;
@ -9737,6 +9747,9 @@ static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchem
if (pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey) {
flags |= COL_IS_KEY;
}
if (IS_DECIMAL_TYPE(pCol->dataType.type)) {
flags |= COL_HAS_TYPE_MOD;
}
pSchema->colId = colId;
pSchema->type = pCol->dataType.type;
pSchema->bytes = calcTypeBytes(pCol->dataType);
@ -14163,7 +14176,7 @@ static int32_t extractExplainResultSchema(int32_t* numOfCols, SSchema** pSchema)
static int32_t extractDescribeResultSchema(STableMeta* pMeta, int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = DESCRIBE_RESULT_COLS;
if (pMeta && useCompress(pMeta->tableType)) *numOfCols = DESCRIBE_RESULT_COLS_COMPRESS;
if (pMeta && withExtSchema(pMeta->tableType)) *numOfCols = DESCRIBE_RESULT_COLS_COMPRESS;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return terrno;
@ -14185,7 +14198,7 @@ static int32_t extractDescribeResultSchema(STableMeta* pMeta, int32_t* numOfCols
(*pSchema)[3].bytes = DESCRIBE_RESULT_NOTE_LEN;
tstrncpy((*pSchema)[3].name, "note", TSDB_COL_NAME_LEN);
if (pMeta && useCompress(pMeta->tableType)) {
if (pMeta && withExtSchema(pMeta->tableType)) {
(*pSchema)[4].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[4].bytes = DESCRIBE_RESULT_COPRESS_OPTION_LEN;
tstrncpy((*pSchema)[4].name, "encode", TSDB_COL_NAME_LEN);
@ -15103,6 +15116,16 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
return code;
}
}
if (IS_DECIMAL_TYPE(pColDef->dataType.type)) {
if (!req.pExtSchemas) {
req.pExtSchemas = taosMemoryCalloc(pStmt->pCols->length, sizeof(SExtSchema));
if (NULL == req.pExtSchemas) {
tdDestroySVCreateTbReq(&req);
return terrno;
}
}
req.pExtSchemas[index].typeMod = calcTypeMod(&pColDef->dataType);
}
++index;
}
pBatch->info = *pVgroupInfo;

View File

@ -1071,7 +1071,7 @@ int32_t getTableNameFromCache(SParseMetaCache* pMetaCache, const SName* pName, c
int32_t metaSize =
sizeof(STableMeta) + sizeof(SSchema) * (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags);
int32_t schemaExtSize =
(useCompress(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0;
(withExtSchema(pMeta->tableType) && pMeta->schemaExt) ? sizeof(SSchemaExt) * pMeta->tableInfo.numOfColumns : 0;
const char* pTableName = (const char*)pMeta + metaSize + schemaExtSize;
tstrncpy(pTbName, pTableName, TSDB_TABLE_NAME_LEN);
}
@ -1402,7 +1402,7 @@ STableCfg* tableCfgDup(STableCfg* pCfg) {
pNew->pSchemas = pSchema;
SSchemaExt* pSchemaExt = NULL;
if (useCompress(pCfg->tableType) && pCfg->pSchemaExt) {
if (withExtSchema(pCfg->tableType) && pCfg->pSchemaExt) {
int32_t schemaExtSize = pCfg->numOfColumns * sizeof(SSchemaExt);
pSchemaExt = taosMemoryMalloc(schemaExtSize);
if (!pSchemaExt) goto err;

View File

@ -5675,6 +5675,7 @@ static int32_t tableCountScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLo
}
static SSortLogicNode* sortNonPriKeySatisfied(SLogicNode* pNode) {
return NULL;
if (QUERY_NODE_LOGIC_PLAN_SORT != nodeType(pNode)) {
return NULL;
}

View File

@ -576,7 +576,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
int32_t schemaExtSize = 0;
if (useCompress(pSrc->tableType) && pSrc->schemaExt) {
if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt);
}
*pDst = taosMemoryMalloc(metaSize + schemaExtSize);
@ -584,7 +584,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return terrno;
}
memcpy(*pDst, pSrc, metaSize);
if (useCompress(pSrc->tableType) && pSrc->schemaExt) {
if (withExtSchema(pSrc->tableType) && pSrc->schemaExt) {
(*pDst)->schemaExt = (SSchemaExt*)((char*)*pDst + metaSize);
memcpy((*pDst)->schemaExt, pSrc->schemaExt, schemaExtSize);
} else {

View File

@ -532,7 +532,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
QUERY_PARAM_CHECK(pMeta);
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
int32_t schemaExtSize = (useCompress(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
int32_t schemaExtSize = (withExtSchema(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize);
if (NULL == pTableMeta) {
@ -553,7 +553,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
if (useCompress(msg->tableType) && msg->pSchemaExt) {
if (withExtSchema(msg->tableType) && msg->pSchemaExt) {
pTableMeta->schemaExt = pSchemaExt;
memcpy(pSchemaExt, msg->pSchemaExt, schemaExtSize);
} else {
@ -588,7 +588,7 @@ int32_t queryCreateTableMetaExFromMsg(STableMetaRsp *msg, bool isStb, STableMeta
QUERY_PARAM_CHECK(pMeta);
int32_t total = msg->numOfColumns + msg->numOfTags;
int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
int32_t schemaExtSize = (useCompress(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
int32_t schemaExtSize = (withExtSchema(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
int32_t tbNameSize = strlen(msg->tbName) + 1;
STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + tbNameSize);
@ -610,7 +610,7 @@ int32_t queryCreateTableMetaExFromMsg(STableMetaRsp *msg, bool isStb, STableMeta
pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
TAOS_MEMCPY(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
if (useCompress(msg->tableType) && msg->pSchemaExt) {
if (withExtSchema(msg->tableType) && msg->pSchemaExt) {
pTableMeta->schemaExt = pSchemaExt;
TAOS_MEMCPY(pSchemaExt, msg->pSchemaExt, schemaExtSize);
} else {

View File

@ -7,7 +7,244 @@ from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# from tmqCommon import *
class DecimalType:
def __init__(self, precision: int, scale: int):
self.precision = precision
self.scale = scale
def __init__(self, precision: str, scale: str):
self.precision = int(precision)
self.scale = int(scale)
def __str__(self):
return f"DECIMAL({self.precision}, {self.scale})"
def __eq__(self, other):
return self.precision == other.precision and self.scale == other.scale
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.precision, self.scale))
def __repr__(self):
return f"DecimalType({self.precision}, {self.scale})"
def generate_value(self, allow_weight_overflow = False, allow_scale_overflow = False) -> str:
if allow_weight_overflow:
weight = secrets.randbelow(40)
else:
weight = secrets.randbelow(self.precision + 1)
if allow_scale_overflow:
dscale = secrets.randbelow(40 - weight + 1)
else:
dscale = secrets.randbelow(self.precision - weight + 1)
digits :str = ''
for i in range(dscale):
digits += str(secrets.randbelow(10))
if dscale > 0:
digits += '.'
for _ in range(dscale):
digits += str(secrets.randbelow(10))
return digits
class TypeEnum:
BOOL = 1
TINYINT = 2
SMALLINT = 3
INT = 4
BIGINT = 5
FLOAT = 6
DOUBLE = 7
VARCHAR = 8
TIMESTAMP = 9
NCHAR = 10
UTINYINT = 11
USMALLINT = 12
UINT = 13
UBIGINT = 14
JSON = 15
VARBINARY = 16
DECIMAL = 17
BINARY = 8
GEOMETRY = 20
DECIMAL64 = 21
def get_type_str(type: int):
if type == TypeEnum.BOOL:
return "BOOL"
elif type == TypeEnum.TINYINT:
return "TINYINT"
elif type == TypeEnum.SMALLINT:
return "SMALLINT"
elif type == TypeEnum.INT:
return "INT"
elif type == TypeEnum.BIGINT:
return "BIGINT"
elif type == TypeEnum.FLOAT:
return "FLOAT"
elif type == TypeEnum.DOUBLE:
return "DOUBLE"
elif type == TypeEnum.VARCHAR:
return "VARCHAR"
elif type == TypeEnum.TIMESTAMP:
return "TIMESTAMP"
elif type == TypeEnum.NCHAR:
return "NCHAR"
elif type == TypeEnum.UTINYINT:
return "UTINYINT"
elif type == TypeEnum.USMALLINT:
return "USMALLINT"
elif type == TypeEnum.UINT:
return "UINT"
elif type == TypeEnum.UBIGINT:
return "UBIGINT"
elif type == TypeEnum.JSON:
return "JSON"
elif type == TypeEnum.VARBINARY:
return "VARBINARY"
elif type == TypeEnum.DECIMAL:
return "DECIMAL"
elif type == TypeEnum.BINARY:
return "BINARY"
elif type == TypeEnum.GEOMETRY:
return "GEOMETRY"
else:
raise "unknow type"
class DataType:
def __init__(self, type: TypeEnum, length: int = 0, type_mod: int = 0):
self.type : TypeEnum = type
self.length = length
self.type_mod = type_mod
def __str__(self):
if self.type_mod != 0:
decimal_type = self.get_decimal_type()
return f"{TypeEnum.get_type_str(self.type)}({decimal_type.precision}, {decimal_type.scale})"
if self.length:
return f"{TypeEnum.get_type_str(self.type)}({self.length})"
return TypeEnum.get_type_str(self.type)
def __eq__(self, other):
return self.type == other.type and self.length == other.length
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash((self.type, self.length))
def __repr__(self):
return f"DataType({self.type}, {self.length}, {self.type_mod})"
def get_decimal_type_mod(type: DecimalType) -> int:
return type.precision * 100 + type.scale
def get_decimal_type(self) -> DecimalType:
return DecimalType(self.type_mod // 100, self.type_mod % 100)
def construct_type_value(self, val: str):
if self.type == TypeEnum.BINARY or self.type == TypeEnum.VARCHAR or self.type == TypeEnum.NCHAR or self.type == TypeEnum.VARBINARY:
return f"'{val}'"
else:
return val
def generate_value(self) -> str:
if self.type == TypeEnum.BOOL:
return str(secrets.randbelow(2))
if self.type == TypeEnum.TINYINT:
return str(secrets.randbelow(256) - 128)
if self.type == TypeEnum.SMALLINT:
return str(secrets.randbelow(65536) - 32768)
if self.type == TypeEnum.INT:
return str(secrets.randbelow(4294967296) - 2147483648)
if self.type == TypeEnum.BIGINT:
return str(secrets.randbelow(9223372036854775808) - 4611686018427387904)
if self.type == TypeEnum.FLOAT or self.type == TypeEnum.DOUBLE:
return str(random.random())
if self.type == TypeEnum.VARCHAR or self.type == TypeEnum.NCHAR or self.type == TypeEnum.VARBINARY:
return f"'{secrets.token_urlsafe(self.length)}'"
if self.type == TypeEnum.TIMESTAMP:
return str(secrets.randbelow(9223372036854775808))
if self.type == TypeEnum.UTINYINT:
return str(secrets.randbelow(256))
if self.type == TypeEnum.USMALLINT:
return str(secrets.randbelow(65536))
if self.type == TypeEnum.UINT:
return str(secrets.randbelow(4294967296))
if self.type == TypeEnum.UBIGINT:
return str(secrets.randbelow(9223372036854775808))
if self.type == TypeEnum.JSON:
return f'{{"key": "{secrets.token_urlsafe(10)}"}}'
if self.type == TypeEnum.DECIMAL:
return self.get_decimal_type().generate_value()
class DecimalColumnTableCreater:
def __init__(self, conn, dbName: str, tbName: str, columns_types: List[DataType], tags_types: List[DataType] = []):
self.conn = conn
self.dbName = dbName
self.tbName = tbName
self.tags_types = tags_types
self.columns_types = columns_types
def create(self):
if len(self.tags_types) > 0:
table = 'stable'
else:
table = 'table'
sql = f"create {table} {self.dbName}.{self.tbName} (ts timestamp"
for i, column in enumerate(self.columns_types):
sql += f", c{i+1} {column}"
if self.tags_types:
sql += ") tags("
for i, tag in enumerate(self.tags_types):
sql += f"t{i+1} {tag}"
if i != len(self.tags_types) - 1:
sql += ", "
sql += ")"
self.conn.execute(sql, queryTimes=1)
def create_child_table(self, ctbPrefix: str, ctbNum: int, tag_types: List[DataType], tag_values: List[str]):
for i in range(ctbNum):
sql = f"create table {self.dbName}.{ctbPrefix}{i} using {self.dbName}.{self.tbName} tags("
for j, tag in enumerate(tag_types):
sql += f"{tag.construct_type_value(tag_values[j])}"
if j != len(tag_types) - 1:
sql += ", "
sql += ")"
self.conn.execute(sql, queryTimes=1)
class TableInserter:
def __init__(self, conn, dbName: str, tbName: str, columns_types: List[DataType], tags_types: List[DataType] = []):
self.conn = conn
self.dbName = dbName
self.tbName = tbName
self.tags_types = tags_types
self.columns_types = columns_types
def insert(self, rows: int, start_ts: int, step: int):
pre_insert = f"insert into {self.dbName}.{self.tbName} values"
sql = pre_insert
for i in range(rows):
sql += f"({start_ts + i * step}"
for column in self.columns_types:
sql += f", {column.generate_value()}"
if self.tags_types:
sql += ") tags("
for tag in self.tags_types:
sql += f"{tag.generate_value()},"
sql = sql[:-1]
sql += ")"
if i != rows - 1:
sql += ", "
if len(sql) > 1000:
self.conn.execute(sql, queryTimes=1)
sql = pre_insert
if len(sql) > len(pre_insert):
self.conn.execute(sql, queryTimes=1)
class TDTestCase:
updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'debugFlag': 143}
@ -17,6 +254,13 @@ class TDTestCase:
self.ctbNum = 10
self.rowsPerTbl = 10000
self.duraion = '1h'
self.columns = []
self.tags = []
self.stable_name = "meters"
self.norm_table_name = "norm_table"
self.c_table_prefix = "t"
self.db_name = "test"
self.c_table_num = 10
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -133,28 +377,78 @@ class TDTestCase:
self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb',
paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep'])
def check_desc(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []):
sql = f"desc {self.db_name}.{tbname}"
tdSql.query(sql, queryTimes=1)
results = tdSql.queryResult
for i, column_type in enumerate(column_types):
if column_type.type == TypeEnum.DECIMAL:
if results[i+1][1] != "DECIMAL":
tdLog.exit(f"column {i+1} type is {results[i+1][1]}, expect DECIMAL")
## add decimal type bytes check
## add compression/encode check
def check_show_create_table(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []):
sql = f"show create table {self.db_name}.{tbname}"
tdSql.query(sql, queryTimes=1)
create_table_sql = tdSql.queryResult[0][1]
decimal_idx = 0
results = re.findall(r"DECIMAL\((\d+),(\d+)\)", create_table_sql)
for i, column_type in enumerate(column_types):
if column_type.type == TypeEnum.DECIMAL:
result_type = DecimalType(results[decimal_idx][0], results[decimal_idx][1])
if result_type != column_type.get_decimal_type():
tdLog.exit(f"column {i+1} type is {results[0][1]}, expect DECIMAL")
decimal_idx += 1
def test_create_decimal_column(self):
## create decimal type table, normal/super table, decimal64/decimal128
tdLog.printNoPrefix("-------- test create decimal column")
sql = "create stable test.meters (ts timestamp, c1 decimal(10, 2), c2 decimal(20, 2), c3 decimal(30, 2), c4 decimal(38, 2)) tags(t1 int, t2 varchar(255))"
tdSql.execute(sql, queryTimes=1)
self.columns = [
DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(10, 2))),
DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(20, 2))),
DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(30, 2))),
DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(38, 2))),
]
self.tags = [
DataType(TypeEnum.INT),
DataType(TypeEnum.VARCHAR, 255)
]
DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.columns, self.tags).create()
self.check_desc("meters", self.columns, self.tags)
self.check_show_create_table("meters", self.columns, self.tags)
sql = "create table test.norm_table(ts timestamp, c1 decimal(10, 2), c2 decimal(20, 2), c3 decimal(30, 2), c4 decimal(38, 2))"
tdSql.execute(sql, queryTimes=1)
DecimalColumnTableCreater(tdSql, self.db_name, self.norm_table_name, self.columns).create()
self.check_desc("norm_table", self.columns)
self.check_show_create_table("norm_table", self.columns)
## TODO add more values for all rows
tag_values = [
"1", "t1"
]
DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.columns).create_child_table(self.c_table_prefix, self.c_table_num, self.tags, tag_values)
self.check_desc("t1", self.columns, self.tags)
return
## invalid precision/scale
invalid_precision_scale = ["decimal(-1, 2)", "decimal(39, 2)", "decimal(10, -1)", "decimal(10, 39)", "decimal(10, 2.5)", "decimal(10.5, 2)", "decimal(10.5, 2.5)", "decimal(0, 2)", "decimal(0)", "decimal", "decimal()"]
for i in invalid_precision_scale:
sql = f"create table test.invalid_decimal_precision_scale (ts timestamp, c1 {i})"
sql = f"create table {self.db_name}.invalid_decimal_precision_scale (ts timestamp, c1 {i})"
tdSql.error(sql, -1)
## can't create decimal tag
## alter table
## drop index from stb
### These ops will override the previous stbobjs and metaentries, so test it
### These ops will override the previous stbobjs and meta entries, so test it
def test_insert_decimal_values(self):
for i in range(self.c_table_num):
pass
#TableInserter(tdSql, self.db_name, f"{self.c_table_prefix}{i}", self.columns, self.tags).insert(1, 1537146000000, 500)
TableInserter(tdSql, self.db_name, self.norm_table_name, self.columns).insert(1, 1537146000000, 500)
def test_decimal_ddl(self):
tdSql.execute("create database test", queryTimes=1)
@ -162,6 +456,7 @@ class TDTestCase:
def run(self):
self.test_decimal_ddl()
self.test_insert_decimal_values()
time.sleep(9999999)
def stop(self):
@ -172,4 +467,4 @@ class TDTestCase:
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())