From 44b1bbc6d3206067032c07b47cbdfc9176947048 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 Date: Sat, 15 Feb 2025 14:46:10 +0800 Subject: [PATCH] fix decimal table meta and add tests for decimal col streams --- include/common/tmsg.h | 5 +- include/common/ttypes.h | 3 + source/client/src/clientRawBlockWrite.c | 1 + source/common/src/msg/tmsg.c | 63 ++++++++-- source/common/src/tdatablock.c | 11 +- source/common/src/ttypes.c | 13 ++ source/dnode/mnode/impl/src/mndIndex.c | 3 + source/dnode/mnode/impl/src/mndSma.c | 3 + source/dnode/mnode/impl/src/mndStb.c | 64 ++++++++-- source/dnode/mnode/impl/src/mndStream.c | 6 + source/dnode/vnode/src/meta/metaEntry.c | 1 + source/dnode/vnode/src/meta/metaEntry2.c | 2 +- source/dnode/vnode/src/meta/metaQuery.c | 1 + source/dnode/vnode/src/meta/metaTable.c | 8 ++ source/dnode/vnode/src/meta/metaTable2.c | 1 + source/libs/catalog/src/ctgCache.c | 13 +- source/libs/executor/src/dataInserter.c | 2 +- source/libs/nodes/src/nodesCodeFuncs.c | 4 +- source/libs/parser/src/parInsertUtil.c | 2 +- source/libs/parser/src/parTranslater.c | 18 ++- tests/system-test/2-query/decimal.py | 152 +++++++++++++++++------ 21 files changed, 297 insertions(+), 79 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ae6aa6c254..b3406558f6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -706,9 +706,11 @@ typedef struct { } SMonitorParas; typedef struct { - STypeMod typeMod; + STypeMod typeMod; // TODO wjm copy it with a struct, not it's internal members } SExtSchema; +bool hasExtSchema(const SExtSchema* pExtSchema); + typedef struct { int32_t nCols; int32_t version; @@ -1004,6 +1006,7 @@ typedef struct { char* comment; int32_t sqlLen; char* sql; + SArray* pTypeMods; } SMAlterStbReq; int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq); diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 148c14cf6e..9bf1c973b7 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -419,6 +419,9 @@ void decimalFromTypeMod(STypeMod typeMod, uint8_t *precision, uint8_t *scale // pType->type should has been set void fillTypeFromTypeMod(SDataType *pType, STypeMod mod); uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod); +// TODO fix me!! for compatibility issue, save precision in scale in bytes, move it to somewhere else +void fillBytesForDecimalType(int32_t *pBytes, int32_t type, uint8_t precision, uint8_t scale); +void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_t *scale); #ifdef __cplusplus } diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 5c13e1617e..d7d5a21f24 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1060,6 +1060,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) { SColCmpr* pCmp = &req.colCmpr.pColCmpr[i]; field.compress = pCmp->alg; } + if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod; RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field)); } pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField)); diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 01912dc369..ee1f4d43ef 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -905,7 +905,6 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) TAOS_CHECK_EXIT(tEncodeI32(&encoder, pField->bytes)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pField->name)); TAOS_CHECK_EXIT(tEncodeU32(&encoder, pField->compress)); - TAOS_CHECK_EXIT(tEncodeI32(&encoder, pField->typeMod)); } else { SField *pField = taosArrayGet(pReq->pFields, i); @@ -920,6 +919,19 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->comment)); } ENCODESQL(); + if (pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN || + pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION) { + if (taosArrayGetSize(pReq->pTypeMods) > 0) { + int8_t hasTypeMod = 1; + TAOS_CHECK_EXIT(tEncodeI8(&encoder, hasTypeMod)); + for (int32_t i = 0; i < pReq->pTypeMods->size; ++i) { + const STypeMod *pTypeMod = taosArrayGet(pReq->pTypeMods, i); + TAOS_CHECK_ERRNO(tEncodeI32(&encoder, *pTypeMod)); + } + } else { + TAOS_CHECK_EXIT(tEncodeI8(&encoder, 0)); + } + } tEndEncode(&encoder); _exit: @@ -958,7 +970,6 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq TAOS_CHECK_EXIT(tDecodeI32(&decoder, &field.bytes)); TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, field.name)); TAOS_CHECK_EXIT(tDecodeU32(&decoder, &field.compress)); - TAOS_CHECK_EXIT(tDecodeI32(&decoder, &field.typeMod)); if (taosArrayPush(pReq->pFields, &field) == NULL) { TAOS_CHECK_EXIT(terrno); } @@ -984,7 +995,24 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq } DECODESQL(); - + if (!tDecodeIsEnd(&decoder) && (pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN || + pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION)) { + int8_t hasTypeMod = 0; + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasTypeMod)); + if (hasTypeMod == 1) { + pReq->pTypeMods = taosArrayInit(pReq->numOfFields, sizeof(STypeMod)); + if (!pReq->pTypeMods) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < pReq->numOfFields; ++i) { + STypeMod typeMod = 0; + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &typeMod)); + if (taosArrayPush(pReq->pTypeMods, &typeMod) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + } + } tEndDecode(&decoder); _exit: @@ -997,6 +1025,7 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) { pReq->pFields = NULL; taosMemoryFreeClear(pReq->comment); FREESQL(); + taosArrayDestroy(pReq->pTypeMods); } int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) { @@ -10121,7 +10150,13 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS SFieldWithOptions *pField = taosArrayGet(pReq->pCols, i); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pField->type)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pField->flags)); - TAOS_CHECK_EXIT(tEncodeI32(&encoder, pField->bytes)); + int32_t bytes = pField->bytes; + if (IS_DECIMAL_TYPE(pField->type)) { + uint8_t prec = 0, scale = 0; + extractTypeFromTypeMod(pField->type, pField->typeMod, &prec, &scale, NULL); + fillBytesForDecimalType(&bytes, pField->type, prec, scale); + } + TAOS_CHECK_EXIT(tEncodeI32(&encoder, bytes)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pField->name)); } @@ -10965,7 +11000,6 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->type)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->flags)); TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pReq->bytes)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->typeMod)); break; case TSDB_ALTER_TABLE_DROP_COLUMN: TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pReq->colName)); @@ -11022,13 +11056,15 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->flags)); TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pReq->bytes)); TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pReq->compress)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->typeMod)); break; default: break; } TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ctimeMs)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->source)); + if (pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION || pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN) { + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->typeMod)); + } tEndEncode(pEncoder); _exit: @@ -11048,9 +11084,6 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq) TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->type)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->flags)); TAOS_CHECK_EXIT(tDecodeI32v(pDecoder, &pReq->bytes)); - if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->typeMod)); - } break; case TSDB_ALTER_TABLE_DROP_COLUMN: TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &pReq->colName)); @@ -11114,9 +11147,6 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq) TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->flags)); TAOS_CHECK_EXIT(tDecodeI32v(pDecoder, &pReq->bytes)); TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pReq->compress)); - if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->typeMod)); - } default: break; } @@ -11138,6 +11168,11 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { if (!tDecodeIsEnd(pDecoder)) { TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->source)); } + if (pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN || pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION) { + if (!tDecodeIsEnd(pDecoder)) { + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->typeMod)); + } + } tEndDecode(pDecoder); _exit: @@ -13469,3 +13504,7 @@ void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) { pRsp->batchMetaReq = NULL; pRsp->batchMetaLen = NULL; } + +bool hasExtSchema(const SExtSchema *pExtSchema) { + return pExtSchema->typeMod != 0; +} \ No newline at end of file diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bdc6ad39ed..e64227ce46 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -3199,10 +3199,8 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, in int32_t bytes = pColInfoData->info.bytes; *((int32_t*)data) = bytes; if (IS_DECIMAL_TYPE(pColInfoData->info.type)) { - *(char*)data = bytes; - *((char*)data + 1) = 0; - *((char*)data + 2) = pColInfoData->info.precision; - *((char*)data + 3) = pColInfoData->info.scale; + fillBytesForDecimalType((int32_t*)data, pColInfoData->info.type, pColInfoData->info.precision, + pColInfoData->info.scale); } data += sizeof(int32_t); } @@ -3337,9 +3335,8 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos pColInfoData->info.bytes = *(int32_t*)pStart; if (IS_DECIMAL_TYPE(pColInfoData->info.type)) { - pColInfoData->info.scale = *(char*)pStart; - pColInfoData->info.precision = *((char*)pStart + 2); - pColInfoData->info.bytes &= 0xFF; + extractDecimalTypeInfoFromBytes(&pColInfoData->info.bytes, &pColInfoData->info.precision, + &pColInfoData->info.scale); } pStart += sizeof(int32_t); diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index 246c72ece9..6df405c020 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -279,3 +279,16 @@ uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod) { if (IS_DECIMAL_TYPE(type)) return (uint8_t)(mod & 0xFF); return 0; } + +void fillBytesForDecimalType(int32_t *pBytes, int32_t type, uint8_t precision, uint8_t scale) { + *(char *)pBytes = tDataTypes[type].bytes; + *((char *)pBytes + 1) = 0; + *((char *)pBytes + 2) = precision; + *((char *)pBytes + 3) = scale; +} + +void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_t *scale) { + *scale = *(uint8_t *)pBytes; + *precision = *((uint8_t *)pBytes + 2); + *pBytes &= 0xFF; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndIndex.c b/source/dnode/mnode/impl/src/mndIndex.c index f5dac9df65..496f9cadb5 100644 --- a/source/dnode/mnode/impl/src/mndIndex.c +++ b/source/dnode/mnode/impl/src/mndIndex.c @@ -664,6 +664,7 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb pNew->pColumns = NULL; pNew->pCmpr = NULL; pNew->pTags = NULL; + pNew->pExtSchemas = NULL; pNew->updateTime = taosGetTimestampMs(); pNew->lock = 0; @@ -733,6 +734,7 @@ _OVER: taosMemoryFree(newStb.pTags); taosMemoryFree(newStb.pColumns); taosMemoryFree(newStb.pCmpr); + taosMemoryFreeClear(newStb.pExtSchemas); } mndTransDrop(pTrans); TAOS_RETURN(code); @@ -847,6 +849,7 @@ _OVER: taosMemoryFree(newObj.pTags); taosMemoryFree(newObj.pColumns); taosMemoryFree(newObj.pCmpr); + taosMemoryFreeClear(newObj.pExtSchemas); mndTransDrop(pTrans); mndReleaseStb(pMnode, pStb); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 81d1f7677e..22320fe706 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1630,6 +1630,9 @@ static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { f.type = pExprNode->resType.type; f.flags = COL_SMA_ON; tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN); + if (IS_DECIMAL_TYPE(f.type)) { + fillBytesForDecimalType(&f.bytes, f.type, pExprNode->resType.precision, pExprNode->resType.scale); + } if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) { code = terrno; break; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 20fe51eff7..7c29708b35 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -351,6 +351,7 @@ _OVER: taosMemoryFreeClear(pStb->pTags); taosMemoryFreeClear(pStb->comment); taosMemoryFree(pStb->pCmpr); + taosMemoryFreeClear(pStb->pExtSchemas); } taosMemoryFreeClear(pRow); return NULL; @@ -368,6 +369,7 @@ void mndFreeStb(SStbObj *pStb) { taosMemoryFreeClear(pStb->pAst1); taosMemoryFreeClear(pStb->pAst2); taosMemoryFreeClear(pStb->pCmpr); + taosMemoryFreeClear(pStb->pExtSchemas); } static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { @@ -481,6 +483,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr)); } + taosMemoryFreeClear(pOld->pExtSchemas); + if (pNew->pExtSchemas) { + pOld->pExtSchemas = taosMemoryCalloc(pNew->numOfColumns, sizeof(SExtSchema)); + memcpy(pOld->pExtSchemas, pNew->pExtSchemas, pNew->numOfColumns * sizeof(SExtSchema)); + } + taosWUnLockLatch(&pOld->lock); return 0; } @@ -954,7 +962,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); pSchema->colId = pDst->nextColId; pDst->nextColId++; - hasTypeMods = hasTypeMods || pField->typeMod != 0; + hasTypeMods = hasTypeMods || HAS_TYPE_MOD(pSchema); } for (int32_t i = 0; i < pDst->numOfTags; ++i) { @@ -1283,8 +1291,15 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq } else { p->alg = pField->compress; } + // TODO wjm test it with tmq + if (pField->flags & COL_HAS_TYPE_MOD) { + if (!pDst->pExtSchemas) { + pDst->pExtSchemas = taosMemoryCalloc(pDst->numOfColumns, sizeof(SExtSchema)); + if (!pDst->pExtSchemas) TAOS_RETURN(terrno); + } + pDst->pExtSchemas[i].typeMod = pField->typeMod; + } } - // TODO wjm alter table with deicmal table pDst->tagVer = createReq->tagVer; pDst->colVer = createReq->colVer; return TSDB_CODE_SUCCESS; @@ -1416,6 +1431,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { taosMemoryFreeClear(pDst.pTags); taosMemoryFreeClear(pDst.pColumns); taosMemoryFreeClear(pDst.pCmpr); + taosMemoryFreeClear(pDst.pExtSchemas); goto _OVER; } @@ -1423,6 +1439,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { taosMemoryFreeClear(pDst.pTags); taosMemoryFreeClear(pDst.pColumns); taosMemoryFreeClear(pDst.pCmpr); + taosMemoryFreeClear(pDst.pExtSchemas); } else { code = mndCreateStb(pMnode, pReq, &createReq, pDb); } @@ -1488,6 +1505,13 @@ int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) { memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns); memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags); memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns); + if (pOld->pExtSchemas) { + pNew->pExtSchemas = taosMemoryCalloc(pNew->numOfColumns, sizeof(SExtSchema)); + if (pNew->pExtSchemas == NULL) { + TAOS_RETURN(terrno); + } + memcpy(pNew->pExtSchemas, pOld->pExtSchemas, sizeof(SExtSchema) * pOld->numOfColumns); + } TAOS_RETURN(0); } @@ -1906,7 +1930,7 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj * TAOS_RETURN(code); } -static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols, +static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const SMAlterStbReq* pReq, int32_t ncols, int8_t withCompress) { int32_t code = 0; if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) { @@ -1918,7 +1942,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray TAOS_RETURN(code); } - if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) { + if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pReq->pFields, TSDB_MAX_BYTES_PER_ROW)) { code = TSDB_CODE_PAR_INVALID_ROW_LENGTH; TAOS_RETURN(code); } @@ -1934,7 +1958,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray for (int32_t i = 0; i < ncols; i++) { if (withCompress) { - SFieldWithOptions *pField = taosArrayGet(pFields, i); + SFieldWithOptions *pField = taosArrayGet(pReq->pFields, i); if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) { code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST; TAOS_RETURN(code); @@ -1957,7 +1981,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray pCmpr->alg = pField->compress; mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name); } else { - SField *pField = taosArrayGet(pFields, i); + SField *pField = taosArrayGet(pReq->pFields, i); if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) { code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST; TAOS_RETURN(code); @@ -1981,6 +2005,25 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name); } } + // 1. old schema already has extschemas + // 2. new schema has extschemas + if (pReq->pTypeMods || pOld->pExtSchemas) { + if (!pNew->pExtSchemas) { + // all ext schemas reset to zero + pNew->pExtSchemas = taosMemoryCalloc(pNew->numOfColumns, sizeof(SExtSchema)); + if (!pNew->pExtSchemas) TAOS_RETURN(terrno); + } + if (pOld->pExtSchemas) { + memcpy(pNew->pExtSchemas, pOld->pExtSchemas, pOld->numOfColumns * sizeof(SExtSchema)); + } + if (taosArrayGetSize(pReq->pTypeMods) > 0) { + // copy added column ext schema + for (int32_t i = 0; i < ncols; ++i) { + pNew->pColumns[pOld->numOfColumns + i].flags |= COL_HAS_TYPE_MOD; + pNew->pExtSchemas[pOld->numOfColumns + i].typeMod = *(STypeMod *)taosArrayGet(pReq->pTypeMods, i); + } + } + } pNew->colVer++; TAOS_RETURN(code); @@ -2012,6 +2055,9 @@ static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStb int32_t sz = pNew->numOfColumns - col - 1; memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * sz); memmove(pNew->pCmpr + col, pNew->pCmpr + col + 1, sizeof(SColCmpr) * sz); + if (pOld->pExtSchemas) { + memmove(pNew->pExtSchemas + col, pNew->pExtSchemas + col + 1, sizeof(SExtSchema) * sz); + } pNew->numOfColumns--; pNew->colVer++; @@ -2636,6 +2682,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p stbObj.pTags = NULL; stbObj.pFuncs = NULL; stbObj.pCmpr = NULL; + stbObj.pExtSchemas = NULL; stbObj.updateTime = taosGetTimestampMs(); stbObj.lock = 0; bool updateTagIndex = false; @@ -2657,7 +2704,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0); break; case TSDB_ALTER_TABLE_ADD_COLUMN: - code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 0); + code = mndAddSuperTableColumn(pOld, &stbObj, pAlter, pAlter->numOfFields, 0); break; case TSDB_ALTER_TABLE_DROP_COLUMN: pField0 = taosArrayGet(pAlter->pFields, 0); @@ -2675,7 +2722,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields); break; case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: - code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 1); + code = mndAddSuperTableColumn(pOld, &stbObj, pAlter, pAlter->numOfFields, 1); break; default: needRsp = false; @@ -2697,6 +2744,7 @@ _OVER: if (pAlter->commentLen > 0) { taosMemoryFreeClear(stbObj.comment); } + taosMemoryFreeClear(stbObj.pExtSchemas); TAOS_RETURN(code); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7fe5c5fb80..08f16b7e1e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -631,6 +631,12 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre pField->type = pStream->outputSchema.pSchema[i].type; pField->bytes = pStream->outputSchema.pSchema[i].bytes; pField->compress = createDefaultColCmprByType(pField->type); + if (IS_DECIMAL_TYPE(pField->type)) { + uint8_t prec = 0, scale = 0; + extractDecimalTypeInfoFromBytes(&pField->bytes, &prec, &scale); + pField->typeMod = decimalCalcTypeMod(prec, scale); + ASSERT(pField->flags & COL_HAS_TYPE_MOD);// TODO wjm + } } if (pStream->tagSchema.nCols == 0) { diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index d2b61649ca..608d36721c 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -322,6 +322,7 @@ void metaCloneEntryFree(SMetaEntry **ppEntry) { return; } metaCloneColCmprFree(&(*ppEntry)->colCmpr); + taosMemoryFreeClear((*ppEntry)->pExtSchemas); taosMemoryFreeClear(*ppEntry); return; diff --git a/source/dnode/vnode/src/meta/metaEntry2.c b/source/dnode/vnode/src/meta/metaEntry2.c index b23059079a..023196c262 100644 --- a/source/dnode/vnode/src/meta/metaEntry2.c +++ b/source/dnode/vnode/src/meta/metaEntry2.c @@ -1080,7 +1080,7 @@ static int32_t metaHandleSuperTableCreateImpl(SMeta *pMeta, const SMetaEntry *pE const SMetaHandleParam param = { .pEntry = pEntry, }; - + // TODO wjm debug create/alter stable/ctable logic code = metaTableOpFn[op->table][op->op](pMeta, ¶m); if (TSDB_CODE_SUCCESS != code) { metaErr(TD_VID(pMeta->pVnode), code); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 169adf219f..bf576f1e8b 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1465,6 +1465,7 @@ END: if (pCursor->pMeta) metaULock(pCursor->pMeta); if (pCursor->pCur) tdbTbcClose(pCursor->pCur); if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); + taosMemoryFreeClear(oStbEntry.pExtSchemas); tDecoderClear(&dc); tdbFree(pData); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index fd0608d46d..01903eda05 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -96,6 +96,14 @@ int32_t dropTableExtSchema(SMetaEntry *pEntry, int32_t dropColId, int32_t newCol memmove(pEntry->pExtSchemas + dropColId, pEntry->pExtSchemas + dropColId + 1, (newColNum - dropColId) * sizeof(SExtSchema)); } + for (int32_t i = 0; i < newColNum; i++) { // TODO wjm test it.. + if (hasExtSchema(pEntry->pExtSchemas + i)) return 0; + } + // if no column has ext schemas, free the memory. + // TODO wjm looks like we can remove it + // Actually it's not necessary, if there's no ext schemas, it will not encode extschemas when encoding meta + // entry + taosMemoryFreeClear(pEntry->pExtSchemas); return 0; } diff --git a/source/dnode/vnode/src/meta/metaTable2.c b/source/dnode/vnode/src/meta/metaTable2.c index 603dfaf9fa..f394cca429 100644 --- a/source/dnode/vnode/src/meta/metaTable2.c +++ b/source/dnode/vnode/src/meta/metaTable2.c @@ -1789,6 +1789,7 @@ int32_t metaAlterSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) .stbEntry.schemaRow = pReq->schemaRow, .stbEntry.schemaTag = pReq->schemaTag, .colCmpr = pReq->colCmpr, + .pExtSchemas = pReq->pExtSchemas, }; TABLE_SET_COL_COMPRESSED(entry.flags); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 3025fd91ad..0c77a144ab 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -598,13 +598,22 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt } metaSize = CTG_META_SIZE(stbMeta); - *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); + int32_t schemaExtSize = 0; + if (stbMeta->schemaExt) { + schemaExtSize = stbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt); + } + *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize + schemaExtSize); if (NULL == *pTableMeta) { CTG_ERR_RET(terrno); } TAOS_MEMCPY(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); - (*pTableMeta)->schemaExt = NULL; + if (stbMeta->schemaExt) { + (*pTableMeta)->schemaExt = (SSchemaExt*)((char*)*pTableMeta + metaSize); + TAOS_MEMCPY((*pTableMeta)->schemaExt, stbMeta->schemaExt, schemaExtSize); + } else { + (*pTableMeta)->schemaExt = NULL; + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 4b0cef1118..56cbc9e869 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -264,7 +264,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp } break; } - case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_DECIMAL:// TODO wjm what are you doing here? case TSDB_DATA_TYPE_BLOB: case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_MEDIUMBLOB: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index cfa9b58f90..5a318c12c8 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -4610,7 +4610,7 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkFunctionTrimType, pNode->trimType); } if (TSDB_CODE_SUCCESS == code) { - code = dataTypeToJson(&pNode->srcFuncInputType, pJson); + code = tjsonAddObject(pJson, jkFunctionSrcFuncInputDT, dataTypeToJson, &pNode->srcFuncInputType); } return code; } @@ -4650,7 +4650,7 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkFunctionTrimType, pNode->trimType, code); } if (TSDB_CODE_SUCCESS == code) { - code = jsonToDataType(pJson, &pNode->srcFuncInputType); + code = tjsonToObject(pJson, jkFunctionSrcFuncInputDT, jsonToDataType, &pNode->srcFuncInputType); } return code; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 32edd5b8c3..97b6c10eb4 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -384,7 +384,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta* static void destroyColVal(void* p) { SColVal* pVal = p; if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type || - TSDB_DATA_TYPE_VARBINARY == pVal->value.type) { + TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) { taosMemoryFreeClear(pVal->value.pData); } } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 667956d546..c4804473b3 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1481,7 +1481,7 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p return generateSyntaxErrMsg(&pCxt->msgBuf, code); } SSchemaExt* pSchemaExt = - pMeta->schemaExt ? (i > pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL; + pMeta->schemaExt ? (i >= pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL; setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol, pSchemaExt); setColumnPrimTs(pCxt, pCol, pTable); @@ -1561,7 +1561,7 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef, if (0 == strcmp(pCol->colName, pMeta->schema[i].name) && !invisibleColumn(pCxt->pParseCxt->enableSysInfo, pMeta->tableType, pMeta->schema[i].flags)) { - SSchemaExt* pSchemaExt = pMeta->schemaExt ? (i > pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL; + SSchemaExt* pSchemaExt = pMeta->schemaExt ? (i >= pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL; setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol, pSchemaExt); setColumnPrimTs(pCxt, pCol, pTable); *pFound = true; @@ -9294,7 +9294,7 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray, bool calB if (pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey) { field.flags |= COL_IS_KEY; } - if (field.typeMod > 0) { + if (field.typeMod != 0) { field.flags |= COL_HAS_TYPE_MOD; } if (NULL == taosArrayPush(*pArray, &field)) { @@ -10233,9 +10233,15 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* if (NULL == pAlterReq->pFields) { return terrno; } + pAlterReq->pTypeMods = taosArrayInit(2, sizeof(STypeMod)); + if (!pAlterReq->pTypeMods) return terrno; + STypeMod typeMod = calcTypeMod(&pStmt->dataType); switch (pStmt->alterType) { case TSDB_ALTER_TABLE_ADD_COLUMN: + if (NULL == taosArrayPush(pAlterReq->pTypeMods, &typeMod)) { + return terrno; + } // fall through case TSDB_ALTER_TABLE_ADD_TAG: case TSDB_ALTER_TABLE_DROP_TAG: case TSDB_ALTER_TABLE_DROP_COLUMN: @@ -10304,6 +10310,7 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* } } if (NULL == taosArrayPush(pAlterReq->pFields, &field)) return terrno; + if (NULL == taosArrayPush(pAlterReq->pTypeMods, &typeMod)) return terrno; break; } default: @@ -12133,11 +12140,16 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable } SSchema* pSchemas = getTableColumnSchema(pMeta); + const SSchemaExt* pExtSchemas = getTableColumnExtSchema(pMeta); int32_t index = 0; SNode* pProj = NULL; FOREACH(pProj, pProjections) { SSchema* pSchema = pSchemas + index++; SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes}; + if (IS_DECIMAL_TYPE(pSchema->type)) { + STypeMod typeMod = (pExtSchemas + (index - 1))->typeMod; + extractTypeFromTypeMod(pSchema->type, typeMod, &dt.precision, &dt.scale, NULL); + } if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) { SNode* pFunc = NULL; int32_t code = createCastFunc(pCxt, pProj, dt, &pFunc); diff --git a/tests/system-test/2-query/decimal.py b/tests/system-test/2-query/decimal.py index b697bb0450..85654727aa 100644 --- a/tests/system-test/2-query/decimal.py +++ b/tests/system-test/2-query/decimal.py @@ -1,4 +1,5 @@ from random import randrange +from re import A import time import threading import secrets @@ -9,6 +10,10 @@ from util.cases import * from util.dnodes import * from util.common import * +syntax_error = -2147473920 +invalid_column = -2147473918 +invalid_compress_level = -2147483084 +invalid_encode_param = -2147483087 class DecimalType: def __init__(self, precision: int, scale: int): self.precision = precision @@ -240,15 +245,10 @@ class TableInserter: sql += f"({start_ts + i * step}" for column in self.columns_types: sql += f", {column.generate_value()}" - if self.tags_types: - sql += ") tags(" - for tag in self.tags_types: - sql += f"{tag.generate_value()}," - sql = sql[:-2] sql += ")" if i != rows - 1: sql += ", " - local_flush_database = i % 5000 == 0; + local_flush_database = i % 5000 == 0 if len(sql) > 1000: #tdLog.debug(f"insert into with sql{sql}") if flush_database and local_flush_database: @@ -269,13 +269,18 @@ class TDTestCase: self.ctbNum = 10 self.rowsPerTbl = 10000 self.duraion = '1h' - self.columns = [] + self.norm_tb_columns = [] self.tags = [] self.stable_name = "meters" self.norm_table_name = "nt" self.c_table_prefix = "t" self.db_name = "test" self.c_table_num = 10 + self.no_decimal_col_tb_name = 'tt' + self.stb_columns = [] + self.stream_name = 'stream1' + self.stream_out_stb = 'stream_out_stb' + self.tsma_name = 'tsma1' def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -289,7 +294,6 @@ class TDTestCase: tsql.execute("create database if not exists %s vgroups %d replica %d duration %s" % ( dbName, vgroups, replica, duration)) tdLog.debug("complete to create database %s" % (dbName)) - return def create_stable(self, tsql, paraDict): colString = tdCom.gen_column_type_str( @@ -300,7 +304,6 @@ class TDTestCase: paraDict["dbName"], paraDict["stbName"], colString, tagString) tdLog.debug("%s" % (sqlString)) tsql.execute(sqlString) - return def create_ctable(self, tsql=None, dbName='dbx', stbName='stb', ctbPrefix='ctb', ctbNum=1, ctbStartIdx=0): for i in range(ctbNum): @@ -310,7 +313,6 @@ class TDTestCase: tdLog.debug("complete to create %d child tables by %s.%s" % (ctbNum, dbName, stbName)) - return def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int): sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 INT, c4 double, c5 VARCHAR(255))' % ( @@ -392,6 +394,10 @@ class TDTestCase: self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb', paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep']) + def check_desc_for_one_ctb(self, ctbPrefix: str, columns: List[DataType], tags: List[DataType] = []): + ctb_idx = randrange(self.c_table_num) + return self.check_desc(f"{ctbPrefix}{ctb_idx}", columns, tags) + def check_desc(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []): sql = f"desc {self.db_name}.{tbname}" tdSql.query(sql, queryTimes=1) @@ -405,6 +411,8 @@ class TDTestCase: tdLog.exit(f"check desc failed for table: {tbname} column {results[i+1][0]} encode is {results[i+1][5]}, expect {DecimalType.default_encode()}") if results[i+1][5] != DecimalType.default_compression(): tdLog.exit(f"check desc failed for table: {tbname} column {results[i+1][0]} compression is {results[i+1][4]}, expect {DecimalType.default_compression()}") + if tbname == self.stable_name: + self.check_desc_for_one_ctb(self.c_table_prefix, column_types, tag_types) def check_show_create_table(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []): sql = f"show create table {self.db_name}.{tbname}" @@ -418,11 +426,47 @@ class TDTestCase: if result_type != column_type.get_decimal_type(): tdLog.exit(f"check show create table failed for: {tbname} column {i} type is {result_type}, expect {column_type.get_decimal_type()}") decimal_idx += 1 + + def test_add_drop_columns_with_decimal(self, tbname: str, columns: List[DataType]): + is_stb = tbname == self.stable_name + ## alter table add column + create_c99_sql = f'alter table {self.db_name}.{tbname} add column c99 decimal(37, 19)' + columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(37, 19)))) + tdSql.execute(create_c99_sql, queryTimes=1, show=True) + self.check_desc(tbname, columns) + ## alter table add column with compression + create_c100_sql = f'ALTER TABLE {self.db_name}.{tbname} ADD COLUMN c100 decimal(36, 18) COMPRESS "zstd"' + tdSql.execute(create_c100_sql, queryTimes=1, show=True) + columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(36, 18)))) + self.check_desc(tbname, columns) - def test_create_decimal_column(self): + ## drop non decimal column + drop_c6_sql = f'alter table {self.db_name}.{tbname} drop column c6' + tdSql.execute(drop_c6_sql, queryTimes=1, show=True) + c6 = columns.pop(5) + self.check_desc(tbname, columns) + ## drop decimal column and not last column + drop_c99_sql = f'alter table {self.db_name}.{tbname} drop column c99' + tdSql.execute(drop_c99_sql, queryTimes=1, show=True) + c99 = columns.pop(len(columns) - 2) + self.check_desc(tbname, columns) + ## drop decimal column and last column + drop_c100_sql = f'alter table {self.db_name}.{tbname} drop column c100' + tdSql.execute(drop_c100_sql, queryTimes=1, show=True) + c100 = columns.pop(len(columns) - 1) + self.check_desc(tbname, columns) + + ## create decimal back + tdSql.execute(create_c99_sql, queryTimes=1, show=True) + tdSql.execute(create_c100_sql, queryTimes=1, show=True) + columns.append(c99) + columns.append(c100) + self.check_desc(tbname, columns) + + def test_decimal_column_ddl(self): ## create decimal type table, normal/super table, decimal64/decimal128 tdLog.printNoPrefix("-------- test create decimal column") - self.columns = [ + self.norm_tb_columns = [ DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(10, 2))), DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(20, 4))), DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(30, 8))), @@ -438,24 +482,23 @@ class TDTestCase: DataType(TypeEnum.INT), DataType(TypeEnum.VARCHAR, 255) ] - DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.columns, self.tags).create() - self.check_desc("meters", self.columns, self.tags) - self.check_show_create_table("meters", self.columns, self.tags) + self.stb_columns = self.norm_tb_columns.copy() + DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.stb_columns, self.tags).create() + self.check_show_create_table("meters", self.stb_columns, self.tags) - DecimalColumnTableCreater(tdSql, self.db_name, self.norm_table_name, self.columns).create() - self.check_desc(self.norm_table_name, self.columns) - self.check_show_create_table(self.norm_table_name, self.columns) + DecimalColumnTableCreater(tdSql, self.db_name, self.norm_table_name, self.norm_tb_columns).create() + self.check_desc(self.norm_table_name, self.norm_tb_columns) + self.check_show_create_table(self.norm_table_name, self.norm_tb_columns) ## TODO add more values for all rows tag_values = [ "1", "t1" ] - DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.columns).create_child_table(self.c_table_prefix, self.c_table_num, self.tags, tag_values) - self.check_desc("t1", self.columns, self.tags) + DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.norm_tb_columns).create_child_table(self.c_table_prefix, self.c_table_num, self.tags, tag_values) + self.check_desc("meters", self.stb_columns, self.tags) + self.check_desc("t1", self.norm_tb_columns, self.tags) ## invalid precision/scale - syntax_error = -2147473920 - invalid_column = -2147473918 invalid_precision_scale = [("decimal(-1, 2)", syntax_error), ("decimal(39, 2)", invalid_column), ("decimal(10, -1)", syntax_error), ("decimal(10, 39)", invalid_column), ("decimal(10, 2.5)", syntax_error), ("decimal(10.5, 2)", syntax_error), ("decimal(10.5, 2.5)", syntax_error), ("decimal(0, 2)", invalid_column), ("decimal(0)", invalid_column), @@ -468,31 +511,28 @@ class TDTestCase: sql = 'create stable %s.invalid_decimal_tag (ts timestamp) tags (t1 decimal(10, 2))' % (self.db_name) tdSql.error(sql, invalid_column) - ## alter table add column - sql = f'alter table {self.db_name}.{self.norm_table_name} add column c99 decimal(37, 19)' - self.columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(37, 19)))) - tdSql.execute(sql, queryTimes=1) - self.check_desc(self.norm_table_name, self.columns) - ## alter table add column with compression - - ## Test metaentry compatibility problem for decimal type:w - ## How to test it? - ## Create table with no decimal type, the metaentries should not have extschma, and add decimal column, the metaentries should have extschema for all columns. - ## After drop this decimal column, the metaentries should not have extschema for all columns. - ## Test for normal table and super table + ## alter table add/drop column + self.test_add_drop_columns_with_decimal(self.norm_table_name, self.norm_tb_columns) + self.test_add_drop_columns_with_decimal(self.stable_name, self.stb_columns) ## drop index from stb ### These ops will override the previous stbobjs and meta entries, so test it ## TODO test encode and compress for decimal type + sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "simple8b" COMPRESS "zstd"' + tdSql.error(sql, invalid_encode_param) + sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "delta-i" COMPRESS "zstd"' + tdSql.error(sql, invalid_encode_param) + sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "delta-d" COMPRESS "zstd"' + tdSql.error(sql, invalid_encode_param) + sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "bit-packing" COMPRESS "zstd"' + tdSql.error(sql, invalid_encode_param) def test_insert_decimal_values(self): - for i in range(self.c_table_num): - pass - #TableInserter(tdSql, self.db_name, f"{self.c_table_prefix}{i}", self.columns, self.tags).insert(1, 1537146000000, 500) + TableInserter(tdSql, self.db_name, f"{self.c_table_prefix}{i}", self.stb_columns, self.tags).insert(1000, 1537146000000, 500) - TableInserter(tdSql, self.db_name, self.norm_table_name, self.columns).insert(10000, 1537146000000, 500, flush_database=True) + TableInserter(tdSql, self.db_name, self.norm_table_name, self.norm_tb_columns).insert(10000, 1537146000000, 500, flush_database=True) ## insert null/None for decimal type @@ -508,18 +548,48 @@ class TDTestCase: DataType(TypeEnum.FLOAT), DataType(TypeEnum.VARCHAR, 255), ] - DecimalColumnTableCreater(tdSql, self.db_name, "tt", columns, []).create() - TableInserter(tdSql, self.db_name, 'tt', columns).insert(10000, 1537146000000, 500, flush_database=True) + DecimalColumnTableCreater(tdSql, self.db_name, self.no_decimal_col_tb_name, columns, []).create() + TableInserter(tdSql, self.db_name, self.no_decimal_col_tb_name, columns).insert(10000, 1537146000000, 500, flush_database=True) ## TODO wjm test non support decimal version upgrade to decimal support version, and add decimal column + ## Test metaentry compatibility problem for decimal type + ## How to test it? + ## Create table with no decimal type, the metaentries should not have extschma, and add decimal column, the metaentries should have extschema for all columns. + sql = f'ALTER TABLE {self.db_name}.{self.no_decimal_col_tb_name} ADD COLUMN c200 decimal(37, 19)' + tdSql.execute(sql, queryTimes=1) ## now meta entry has ext schemas + columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(37, 19)))) + self.check_desc(self.no_decimal_col_tb_name, columns) + + ## After drop this only decimal column, the metaentries should not have extschema for all columns. + sql = f'ALTER TABLE {self.db_name}.{self.no_decimal_col_tb_name} DROP COLUMN c200' + tdSql.execute(sql, queryTimes=1) ## now meta entry has no ext schemas + columns.pop(len(columns) - 1) + self.check_desc(self.no_decimal_col_tb_name, columns) + sql = f'ALTER TABLE {self.db_name}.{self.no_decimal_col_tb_name} ADD COLUMN c200 int' + tdSql.execute(sql, queryTimes=1) ## meta entry has no ext schemas + columns.append(DataType(TypeEnum.INT)) + self.check_desc(self.no_decimal_col_tb_name, columns) + + self.test_add_drop_columns_with_decimal(self.no_decimal_col_tb_name, columns) + def test_decimal_ddl(self): tdSql.execute("create database test", queryTimes=1) - self.test_create_decimal_column() + self.test_decimal_column_ddl() + ## TODO test decimal column for tmq + + def test_decimal_and_stream(self): + create_stream = f'CREATE STREAM {self.stream_name} FILL_HISTORY 1 INTO {self.db_name}.{self.stream_out_stb} AS SELECT _wstart, count(c1), avg(c2), sum(c3) FROM {self.db_name}.{self.stable_name} INTERVAL(10s)' + tdSql.execute(create_stream, queryTimes=1, show=True) + + def test_decimal_and_tsma(self): + pass def run(self): self.test_decimal_ddl() self.no_decimal_table_test() self.test_insert_decimal_values() + self.test_decimal_and_stream() + self.test_decimal_and_tsma() def stop(self): tdSql.close()