fix decimal table meta and add tests for decimal col streams

This commit is contained in:
wangjiaming0909 2025-02-15 14:46:10 +08:00
parent cdc6d6eaf4
commit 44b1bbc6d3
21 changed files with 297 additions and 79 deletions

View File

@ -706,9 +706,11 @@ typedef struct {
} SMonitorParas; } SMonitorParas;
typedef struct { typedef struct {
STypeMod typeMod; STypeMod typeMod; // TODO wjm copy it with a struct, not it's internal members
} SExtSchema; } SExtSchema;
bool hasExtSchema(const SExtSchema* pExtSchema);
typedef struct { typedef struct {
int32_t nCols; int32_t nCols;
int32_t version; int32_t version;
@ -1004,6 +1006,7 @@ typedef struct {
char* comment; char* comment;
int32_t sqlLen; int32_t sqlLen;
char* sql; char* sql;
SArray* pTypeMods;
} SMAlterStbReq; } SMAlterStbReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq); int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);

View File

@ -419,6 +419,9 @@ void decimalFromTypeMod(STypeMod typeMod, uint8_t *precision, uint8_t *scale
// pType->type should has been set // pType->type should has been set
void fillTypeFromTypeMod(SDataType *pType, STypeMod mod); void fillTypeFromTypeMod(SDataType *pType, STypeMod mod);
uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod); uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod);
// TODO fix me!! for compatibility issue, save precision in scale in bytes, move it to somewhere else
void fillBytesForDecimalType(int32_t *pBytes, int32_t type, uint8_t precision, uint8_t scale);
void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_t *scale);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -1060,6 +1060,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
SColCmpr* pCmp = &req.colCmpr.pColCmpr[i]; SColCmpr* pCmp = &req.colCmpr.pColCmpr[i];
field.compress = pCmp->alg; field.compress = pCmp->alg;
} }
if (req.pExtSchemas) field.typeMod = req.pExtSchemas[i].typeMod;
RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field)); RAW_NULL_CHECK(taosArrayPush(pReq.pColumns, &field));
} }
pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField)); pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));

View File

@ -905,7 +905,6 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pField->bytes)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pField->bytes));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pField->name)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pField->name));
TAOS_CHECK_EXIT(tEncodeU32(&encoder, pField->compress)); TAOS_CHECK_EXIT(tEncodeU32(&encoder, pField->compress));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pField->typeMod));
} else { } else {
SField *pField = taosArrayGet(pReq->pFields, i); SField *pField = taosArrayGet(pReq->pFields, i);
@ -920,6 +919,19 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->comment)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->comment));
} }
ENCODESQL(); ENCODESQL();
if (pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN ||
pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION) {
if (taosArrayGetSize(pReq->pTypeMods) > 0) {
int8_t hasTypeMod = 1;
TAOS_CHECK_EXIT(tEncodeI8(&encoder, hasTypeMod));
for (int32_t i = 0; i < pReq->pTypeMods->size; ++i) {
const STypeMod *pTypeMod = taosArrayGet(pReq->pTypeMods, i);
TAOS_CHECK_ERRNO(tEncodeI32(&encoder, *pTypeMod));
}
} else {
TAOS_CHECK_EXIT(tEncodeI8(&encoder, 0));
}
}
tEndEncode(&encoder); tEndEncode(&encoder);
_exit: _exit:
@ -958,7 +970,6 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &field.bytes)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &field.bytes));
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, field.name)); TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, field.name));
TAOS_CHECK_EXIT(tDecodeU32(&decoder, &field.compress)); TAOS_CHECK_EXIT(tDecodeU32(&decoder, &field.compress));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &field.typeMod));
if (taosArrayPush(pReq->pFields, &field) == NULL) { if (taosArrayPush(pReq->pFields, &field) == NULL) {
TAOS_CHECK_EXIT(terrno); TAOS_CHECK_EXIT(terrno);
} }
@ -984,7 +995,24 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
} }
DECODESQL(); DECODESQL();
if (!tDecodeIsEnd(&decoder) && (pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN ||
pReq->alterType == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION)) {
int8_t hasTypeMod = 0;
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &hasTypeMod));
if (hasTypeMod == 1) {
pReq->pTypeMods = taosArrayInit(pReq->numOfFields, sizeof(STypeMod));
if (!pReq->pTypeMods) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
STypeMod typeMod = 0;
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &typeMod));
if (taosArrayPush(pReq->pTypeMods, &typeMod) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
}
}
}
tEndDecode(&decoder); tEndDecode(&decoder);
_exit: _exit:
@ -997,6 +1025,7 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
pReq->pFields = NULL; pReq->pFields = NULL;
taosMemoryFreeClear(pReq->comment); taosMemoryFreeClear(pReq->comment);
FREESQL(); FREESQL();
taosArrayDestroy(pReq->pTypeMods);
} }
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) { int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
@ -10121,7 +10150,13 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
SFieldWithOptions *pField = taosArrayGet(pReq->pCols, i); SFieldWithOptions *pField = taosArrayGet(pReq->pCols, i);
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pField->type)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pField->type));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pField->flags)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pField->flags));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pField->bytes)); int32_t bytes = pField->bytes;
if (IS_DECIMAL_TYPE(pField->type)) {
uint8_t prec = 0, scale = 0;
extractTypeFromTypeMod(pField->type, pField->typeMod, &prec, &scale, NULL);
fillBytesForDecimalType(&bytes, pField->type, prec, scale);
}
TAOS_CHECK_EXIT(tEncodeI32(&encoder, bytes));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pField->name)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pField->name));
} }
@ -10965,7 +11000,6 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->type)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->type));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->flags)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->flags));
TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pReq->bytes)); TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pReq->bytes));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->typeMod));
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pReq->colName)); TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pReq->colName));
@ -11022,13 +11056,15 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->flags)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->flags));
TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pReq->bytes)); TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pReq->bytes));
TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pReq->compress)); TAOS_CHECK_EXIT(tEncodeU32(pEncoder, pReq->compress));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->typeMod));
break; break;
default: default:
break; break;
} }
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ctimeMs)); TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ctimeMs));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->source)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->source));
if (pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION || pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN) {
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->typeMod));
}
tEndEncode(pEncoder); tEndEncode(pEncoder);
_exit: _exit:
@ -11048,9 +11084,6 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq)
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->type)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->type));
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->flags)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->flags));
TAOS_CHECK_EXIT(tDecodeI32v(pDecoder, &pReq->bytes)); TAOS_CHECK_EXIT(tDecodeI32v(pDecoder, &pReq->bytes));
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->typeMod));
}
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &pReq->colName)); TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &pReq->colName));
@ -11114,9 +11147,6 @@ static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq)
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->flags)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->flags));
TAOS_CHECK_EXIT(tDecodeI32v(pDecoder, &pReq->bytes)); TAOS_CHECK_EXIT(tDecodeI32v(pDecoder, &pReq->bytes));
TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pReq->compress)); TAOS_CHECK_EXIT(tDecodeU32(pDecoder, &pReq->compress));
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->typeMod));
}
default: default:
break; break;
} }
@ -11138,6 +11168,11 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (!tDecodeIsEnd(pDecoder)) { if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->source)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->source));
} }
if (pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN || pReq->action == TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION) {
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->typeMod));
}
}
tEndDecode(pDecoder); tEndDecode(pDecoder);
_exit: _exit:
@ -13469,3 +13504,7 @@ void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp *pRsp) {
pRsp->batchMetaReq = NULL; pRsp->batchMetaReq = NULL;
pRsp->batchMetaLen = NULL; pRsp->batchMetaLen = NULL;
} }
bool hasExtSchema(const SExtSchema *pExtSchema) {
return pExtSchema->typeMod != 0;
}

View File

@ -3199,10 +3199,8 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, in
int32_t bytes = pColInfoData->info.bytes; int32_t bytes = pColInfoData->info.bytes;
*((int32_t*)data) = bytes; *((int32_t*)data) = bytes;
if (IS_DECIMAL_TYPE(pColInfoData->info.type)) { if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
*(char*)data = bytes; fillBytesForDecimalType((int32_t*)data, pColInfoData->info.type, pColInfoData->info.precision,
*((char*)data + 1) = 0; pColInfoData->info.scale);
*((char*)data + 2) = pColInfoData->info.precision;
*((char*)data + 3) = pColInfoData->info.scale;
} }
data += sizeof(int32_t); data += sizeof(int32_t);
} }
@ -3337,9 +3335,8 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
pColInfoData->info.bytes = *(int32_t*)pStart; pColInfoData->info.bytes = *(int32_t*)pStart;
if (IS_DECIMAL_TYPE(pColInfoData->info.type)) { if (IS_DECIMAL_TYPE(pColInfoData->info.type)) {
pColInfoData->info.scale = *(char*)pStart; extractDecimalTypeInfoFromBytes(&pColInfoData->info.bytes, &pColInfoData->info.precision,
pColInfoData->info.precision = *((char*)pStart + 2); &pColInfoData->info.scale);
pColInfoData->info.bytes &= 0xFF;
} }
pStart += sizeof(int32_t); pStart += sizeof(int32_t);

View File

@ -279,3 +279,16 @@ uint8_t getScaleFromTypeMod(int32_t type, STypeMod mod) {
if (IS_DECIMAL_TYPE(type)) return (uint8_t)(mod & 0xFF); if (IS_DECIMAL_TYPE(type)) return (uint8_t)(mod & 0xFF);
return 0; return 0;
} }
void fillBytesForDecimalType(int32_t *pBytes, int32_t type, uint8_t precision, uint8_t scale) {
*(char *)pBytes = tDataTypes[type].bytes;
*((char *)pBytes + 1) = 0;
*((char *)pBytes + 2) = precision;
*((char *)pBytes + 3) = scale;
}
void extractDecimalTypeInfoFromBytes(int32_t *pBytes, uint8_t *precision, uint8_t *scale) {
*scale = *(uint8_t *)pBytes;
*precision = *((uint8_t *)pBytes + 2);
*pBytes &= 0xFF;
}

View File

@ -664,6 +664,7 @@ static int32_t mndSetUpdateIdxStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStb
pNew->pColumns = NULL; pNew->pColumns = NULL;
pNew->pCmpr = NULL; pNew->pCmpr = NULL;
pNew->pTags = NULL; pNew->pTags = NULL;
pNew->pExtSchemas = NULL;
pNew->updateTime = taosGetTimestampMs(); pNew->updateTime = taosGetTimestampMs();
pNew->lock = 0; pNew->lock = 0;
@ -733,6 +734,7 @@ _OVER:
taosMemoryFree(newStb.pTags); taosMemoryFree(newStb.pTags);
taosMemoryFree(newStb.pColumns); taosMemoryFree(newStb.pColumns);
taosMemoryFree(newStb.pCmpr); taosMemoryFree(newStb.pCmpr);
taosMemoryFreeClear(newStb.pExtSchemas);
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
TAOS_RETURN(code); TAOS_RETURN(code);
@ -847,6 +849,7 @@ _OVER:
taosMemoryFree(newObj.pTags); taosMemoryFree(newObj.pTags);
taosMemoryFree(newObj.pColumns); taosMemoryFree(newObj.pColumns);
taosMemoryFree(newObj.pCmpr); taosMemoryFree(newObj.pCmpr);
taosMemoryFreeClear(newObj.pExtSchemas);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);

View File

@ -1630,6 +1630,9 @@ static int32_t mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
f.type = pExprNode->resType.type; f.type = pExprNode->resType.type;
f.flags = COL_SMA_ON; f.flags = COL_SMA_ON;
tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN); tstrncpy(f.name, pExprNode->userAlias, TSDB_COL_NAME_LEN);
if (IS_DECIMAL_TYPE(f.type)) {
fillBytesForDecimalType(&f.bytes, f.type, pExprNode->resType.precision, pExprNode->resType.scale);
}
if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) { if (NULL == taosArrayPush(pCxt->pCreateStreamReq->pCols, &f)) {
code = terrno; code = terrno;
break; break;

View File

@ -351,6 +351,7 @@ _OVER:
taosMemoryFreeClear(pStb->pTags); taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment); taosMemoryFreeClear(pStb->comment);
taosMemoryFree(pStb->pCmpr); taosMemoryFree(pStb->pCmpr);
taosMemoryFreeClear(pStb->pExtSchemas);
} }
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
return NULL; return NULL;
@ -368,6 +369,7 @@ void mndFreeStb(SStbObj *pStb) {
taosMemoryFreeClear(pStb->pAst1); taosMemoryFreeClear(pStb->pAst1);
taosMemoryFreeClear(pStb->pAst2); taosMemoryFreeClear(pStb->pAst2);
taosMemoryFreeClear(pStb->pCmpr); taosMemoryFreeClear(pStb->pCmpr);
taosMemoryFreeClear(pStb->pExtSchemas);
} }
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) { static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
@ -481,6 +483,12 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr)); memcpy(pOld->pCmpr, pNew->pCmpr, pNew->numOfColumns * sizeof(SColCmpr));
} }
taosMemoryFreeClear(pOld->pExtSchemas);
if (pNew->pExtSchemas) {
pOld->pExtSchemas = taosMemoryCalloc(pNew->numOfColumns, sizeof(SExtSchema));
memcpy(pOld->pExtSchemas, pNew->pExtSchemas, pNew->numOfColumns * sizeof(SExtSchema));
}
taosWUnLockLatch(&pOld->lock); taosWUnLockLatch(&pOld->lock);
return 0; return 0;
} }
@ -954,7 +962,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
pSchema->colId = pDst->nextColId; pSchema->colId = pDst->nextColId;
pDst->nextColId++; pDst->nextColId++;
hasTypeMods = hasTypeMods || pField->typeMod != 0; hasTypeMods = hasTypeMods || HAS_TYPE_MOD(pSchema);
} }
for (int32_t i = 0; i < pDst->numOfTags; ++i) { for (int32_t i = 0; i < pDst->numOfTags; ++i) {
@ -1283,8 +1291,15 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
} else { } else {
p->alg = pField->compress; p->alg = pField->compress;
} }
// TODO wjm test it with tmq
if (pField->flags & COL_HAS_TYPE_MOD) {
if (!pDst->pExtSchemas) {
pDst->pExtSchemas = taosMemoryCalloc(pDst->numOfColumns, sizeof(SExtSchema));
if (!pDst->pExtSchemas) TAOS_RETURN(terrno);
}
pDst->pExtSchemas[i].typeMod = pField->typeMod;
}
} }
// TODO wjm alter table with deicmal table
pDst->tagVer = createReq->tagVer; pDst->tagVer = createReq->tagVer;
pDst->colVer = createReq->colVer; pDst->colVer = createReq->colVer;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1416,6 +1431,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
taosMemoryFreeClear(pDst.pTags); taosMemoryFreeClear(pDst.pTags);
taosMemoryFreeClear(pDst.pColumns); taosMemoryFreeClear(pDst.pColumns);
taosMemoryFreeClear(pDst.pCmpr); taosMemoryFreeClear(pDst.pCmpr);
taosMemoryFreeClear(pDst.pExtSchemas);
goto _OVER; goto _OVER;
} }
@ -1423,6 +1439,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
taosMemoryFreeClear(pDst.pTags); taosMemoryFreeClear(pDst.pTags);
taosMemoryFreeClear(pDst.pColumns); taosMemoryFreeClear(pDst.pColumns);
taosMemoryFreeClear(pDst.pCmpr); taosMemoryFreeClear(pDst.pCmpr);
taosMemoryFreeClear(pDst.pExtSchemas);
} else { } else {
code = mndCreateStb(pMnode, pReq, &createReq, pDb); code = mndCreateStb(pMnode, pReq, &createReq, pDb);
} }
@ -1488,6 +1505,13 @@ int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns); memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags); memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns); memcpy(pNew->pCmpr, pOld->pCmpr, sizeof(SColCmpr) * pOld->numOfColumns);
if (pOld->pExtSchemas) {
pNew->pExtSchemas = taosMemoryCalloc(pNew->numOfColumns, sizeof(SExtSchema));
if (pNew->pExtSchemas == NULL) {
TAOS_RETURN(terrno);
}
memcpy(pNew->pExtSchemas, pOld->pExtSchemas, sizeof(SExtSchema) * pOld->numOfColumns);
}
TAOS_RETURN(0); TAOS_RETURN(0);
} }
@ -1906,7 +1930,7 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *
TAOS_RETURN(code); TAOS_RETURN(code);
} }
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols, static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const SMAlterStbReq* pReq, int32_t ncols,
int8_t withCompress) { int8_t withCompress) {
int32_t code = 0; int32_t code = 0;
if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) { if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
@ -1918,7 +1942,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
TAOS_RETURN(code); TAOS_RETURN(code);
} }
if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) { if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pReq->pFields, TSDB_MAX_BYTES_PER_ROW)) {
code = TSDB_CODE_PAR_INVALID_ROW_LENGTH; code = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -1934,7 +1958,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
if (withCompress) { if (withCompress) {
SFieldWithOptions *pField = taosArrayGet(pFields, i); SFieldWithOptions *pField = taosArrayGet(pReq->pFields, i);
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) { if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST; code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
TAOS_RETURN(code); TAOS_RETURN(code);
@ -1957,7 +1981,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
pCmpr->alg = pField->compress; pCmpr->alg = pField->compress;
mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name); mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
} else { } else {
SField *pField = taosArrayGet(pFields, i); SField *pField = taosArrayGet(pReq->pFields, i);
if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) { if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) {
code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST; code = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
TAOS_RETURN(code); TAOS_RETURN(code);
@ -1981,6 +2005,25 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name); mInfo("stb:%s, start to add column %s", pNew->name, pSchema->name);
} }
} }
// 1. old schema already has extschemas
// 2. new schema has extschemas
if (pReq->pTypeMods || pOld->pExtSchemas) {
if (!pNew->pExtSchemas) {
// all ext schemas reset to zero
pNew->pExtSchemas = taosMemoryCalloc(pNew->numOfColumns, sizeof(SExtSchema));
if (!pNew->pExtSchemas) TAOS_RETURN(terrno);
}
if (pOld->pExtSchemas) {
memcpy(pNew->pExtSchemas, pOld->pExtSchemas, pOld->numOfColumns * sizeof(SExtSchema));
}
if (taosArrayGetSize(pReq->pTypeMods) > 0) {
// copy added column ext schema
for (int32_t i = 0; i < ncols; ++i) {
pNew->pColumns[pOld->numOfColumns + i].flags |= COL_HAS_TYPE_MOD;
pNew->pExtSchemas[pOld->numOfColumns + i].typeMod = *(STypeMod *)taosArrayGet(pReq->pTypeMods, i);
}
}
}
pNew->colVer++; pNew->colVer++;
TAOS_RETURN(code); TAOS_RETURN(code);
@ -2012,6 +2055,9 @@ static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStb
int32_t sz = pNew->numOfColumns - col - 1; int32_t sz = pNew->numOfColumns - col - 1;
memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * sz); memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * sz);
memmove(pNew->pCmpr + col, pNew->pCmpr + col + 1, sizeof(SColCmpr) * sz); memmove(pNew->pCmpr + col, pNew->pCmpr + col + 1, sizeof(SColCmpr) * sz);
if (pOld->pExtSchemas) {
memmove(pNew->pExtSchemas + col, pNew->pExtSchemas + col + 1, sizeof(SExtSchema) * sz);
}
pNew->numOfColumns--; pNew->numOfColumns--;
pNew->colVer++; pNew->colVer++;
@ -2636,6 +2682,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
stbObj.pTags = NULL; stbObj.pTags = NULL;
stbObj.pFuncs = NULL; stbObj.pFuncs = NULL;
stbObj.pCmpr = NULL; stbObj.pCmpr = NULL;
stbObj.pExtSchemas = NULL;
stbObj.updateTime = taosGetTimestampMs(); stbObj.updateTime = taosGetTimestampMs();
stbObj.lock = 0; stbObj.lock = 0;
bool updateTagIndex = false; bool updateTagIndex = false;
@ -2657,7 +2704,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0); code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
break; break;
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 0); code = mndAddSuperTableColumn(pOld, &stbObj, pAlter, pAlter->numOfFields, 0);
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
pField0 = taosArrayGet(pAlter->pFields, 0); pField0 = taosArrayGet(pAlter->pFields, 0);
@ -2675,7 +2722,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields); code = mndUpdateSuperTableColumnCompress(pMnode, pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
break; break;
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION:
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields, 1); code = mndAddSuperTableColumn(pOld, &stbObj, pAlter, pAlter->numOfFields, 1);
break; break;
default: default:
needRsp = false; needRsp = false;
@ -2697,6 +2744,7 @@ _OVER:
if (pAlter->commentLen > 0) { if (pAlter->commentLen > 0) {
taosMemoryFreeClear(stbObj.comment); taosMemoryFreeClear(stbObj.comment);
} }
taosMemoryFreeClear(stbObj.pExtSchemas);
TAOS_RETURN(code); TAOS_RETURN(code);
} }

View File

@ -631,6 +631,12 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
pField->type = pStream->outputSchema.pSchema[i].type; pField->type = pStream->outputSchema.pSchema[i].type;
pField->bytes = pStream->outputSchema.pSchema[i].bytes; pField->bytes = pStream->outputSchema.pSchema[i].bytes;
pField->compress = createDefaultColCmprByType(pField->type); pField->compress = createDefaultColCmprByType(pField->type);
if (IS_DECIMAL_TYPE(pField->type)) {
uint8_t prec = 0, scale = 0;
extractDecimalTypeInfoFromBytes(&pField->bytes, &prec, &scale);
pField->typeMod = decimalCalcTypeMod(prec, scale);
ASSERT(pField->flags & COL_HAS_TYPE_MOD);// TODO wjm
}
} }
if (pStream->tagSchema.nCols == 0) { if (pStream->tagSchema.nCols == 0) {

View File

@ -322,6 +322,7 @@ void metaCloneEntryFree(SMetaEntry **ppEntry) {
return; return;
} }
metaCloneColCmprFree(&(*ppEntry)->colCmpr); metaCloneColCmprFree(&(*ppEntry)->colCmpr);
taosMemoryFreeClear((*ppEntry)->pExtSchemas);
taosMemoryFreeClear(*ppEntry); taosMemoryFreeClear(*ppEntry);
return; return;

View File

@ -1080,7 +1080,7 @@ static int32_t metaHandleSuperTableCreateImpl(SMeta *pMeta, const SMetaEntry *pE
const SMetaHandleParam param = { const SMetaHandleParam param = {
.pEntry = pEntry, .pEntry = pEntry,
}; };
// TODO wjm debug create/alter stable/ctable logic
code = metaTableOpFn[op->table][op->op](pMeta, &param); code = metaTableOpFn[op->table][op->op](pMeta, &param);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code); metaErr(TD_VID(pMeta->pVnode), code);

View File

@ -1465,6 +1465,7 @@ END:
if (pCursor->pMeta) metaULock(pCursor->pMeta); if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur); if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf);
taosMemoryFreeClear(oStbEntry.pExtSchemas);
tDecoderClear(&dc); tDecoderClear(&dc);
tdbFree(pData); tdbFree(pData);

View File

@ -96,6 +96,14 @@ int32_t dropTableExtSchema(SMetaEntry *pEntry, int32_t dropColId, int32_t newCol
memmove(pEntry->pExtSchemas + dropColId, pEntry->pExtSchemas + dropColId + 1, memmove(pEntry->pExtSchemas + dropColId, pEntry->pExtSchemas + dropColId + 1,
(newColNum - dropColId) * sizeof(SExtSchema)); (newColNum - dropColId) * sizeof(SExtSchema));
} }
for (int32_t i = 0; i < newColNum; i++) { // TODO wjm test it..
if (hasExtSchema(pEntry->pExtSchemas + i)) return 0;
}
// if no column has ext schemas, free the memory.
// TODO wjm looks like we can remove it
// Actually it's not necessary, if there's no ext schemas, it will not encode extschemas when encoding meta
// entry
taosMemoryFreeClear(pEntry->pExtSchemas);
return 0; return 0;
} }

View File

@ -1789,6 +1789,7 @@ int32_t metaAlterSuperTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq)
.stbEntry.schemaRow = pReq->schemaRow, .stbEntry.schemaRow = pReq->schemaRow,
.stbEntry.schemaTag = pReq->schemaTag, .stbEntry.schemaTag = pReq->schemaTag,
.colCmpr = pReq->colCmpr, .colCmpr = pReq->colCmpr,
.pExtSchemas = pReq->pExtSchemas,
}; };
TABLE_SET_COL_COMPRESSED(entry.flags); TABLE_SET_COL_COMPRESSED(entry.flags);

View File

@ -598,13 +598,22 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt
} }
metaSize = CTG_META_SIZE(stbMeta); metaSize = CTG_META_SIZE(stbMeta);
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); int32_t schemaExtSize = 0;
if (stbMeta->schemaExt) {
schemaExtSize = stbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt);
}
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize + schemaExtSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
TAOS_MEMCPY(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); TAOS_MEMCPY(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
(*pTableMeta)->schemaExt = NULL; if (stbMeta->schemaExt) {
(*pTableMeta)->schemaExt = (SSchemaExt*)((char*)*pTableMeta + metaSize);
TAOS_MEMCPY((*pTableMeta)->schemaExt, stbMeta->schemaExt, schemaExtSize);
} else {
(*pTableMeta)->schemaExt = NULL;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -264,7 +264,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
} }
break; break;
} }
case TSDB_DATA_TYPE_DECIMAL: case TSDB_DATA_TYPE_DECIMAL:// TODO wjm what are you doing here?
case TSDB_DATA_TYPE_BLOB: case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB: case TSDB_DATA_TYPE_MEDIUMBLOB:

View File

@ -4610,7 +4610,7 @@ static int32_t functionNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkFunctionTrimType, pNode->trimType); code = tjsonAddIntegerToObject(pJson, jkFunctionTrimType, pNode->trimType);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = dataTypeToJson(&pNode->srcFuncInputType, pJson); code = tjsonAddObject(pJson, jkFunctionSrcFuncInputDT, dataTypeToJson, &pNode->srcFuncInputType);
} }
return code; return code;
} }
@ -4650,7 +4650,7 @@ static int32_t jsonToFunctionNode(const SJson* pJson, void* pObj) {
tjsonGetNumberValue(pJson, jkFunctionTrimType, pNode->trimType, code); tjsonGetNumberValue(pJson, jkFunctionTrimType, pNode->trimType, code);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToDataType(pJson, &pNode->srcFuncInputType); code = tjsonToObject(pJson, jkFunctionSrcFuncInputDT, jsonToDataType, &pNode->srcFuncInputType);
} }
return code; return code;

View File

@ -384,7 +384,7 @@ int32_t insGetTableDataCxt(SHashObj* pHash, void* id, int32_t idLen, STableMeta*
static void destroyColVal(void* p) { static void destroyColVal(void* p) {
SColVal* pVal = p; SColVal* pVal = p;
if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type || if (TSDB_DATA_TYPE_NCHAR == pVal->value.type || TSDB_DATA_TYPE_GEOMETRY == pVal->value.type ||
TSDB_DATA_TYPE_VARBINARY == pVal->value.type) { TSDB_DATA_TYPE_VARBINARY == pVal->value.type || TSDB_DATA_TYPE_DECIMAL == pVal->value.type) {
taosMemoryFreeClear(pVal->value.pData); taosMemoryFreeClear(pVal->value.pData);
} }
} }

View File

@ -1481,7 +1481,7 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p
return generateSyntaxErrMsg(&pCxt->msgBuf, code); return generateSyntaxErrMsg(&pCxt->msgBuf, code);
} }
SSchemaExt* pSchemaExt = SSchemaExt* pSchemaExt =
pMeta->schemaExt ? (i > pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL; pMeta->schemaExt ? (i >= pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL;
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol, setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol,
pSchemaExt); pSchemaExt);
setColumnPrimTs(pCxt, pCol, pTable); setColumnPrimTs(pCxt, pCol, pTable);
@ -1561,7 +1561,7 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef,
if (0 == strcmp(pCol->colName, pMeta->schema[i].name) && if (0 == strcmp(pCol->colName, pMeta->schema[i].name) &&
!invisibleColumn(pCxt->pParseCxt->enableSysInfo, pMeta->tableType, pMeta->schema[i].flags)) { !invisibleColumn(pCxt->pParseCxt->enableSysInfo, pMeta->tableType, pMeta->schema[i].flags)) {
SSchemaExt* pSchemaExt = pMeta->schemaExt ? (i > pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL; SSchemaExt* pSchemaExt = pMeta->schemaExt ? (i >= pMeta->tableInfo.numOfColumns ? NULL : (pMeta->schemaExt + i)) : NULL;
setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol, pSchemaExt); setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema + i, (i - pMeta->tableInfo.numOfColumns), pCol, pSchemaExt);
setColumnPrimTs(pCxt, pCol, pTable); setColumnPrimTs(pCxt, pCol, pTable);
*pFound = true; *pFound = true;
@ -9294,7 +9294,7 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray, bool calB
if (pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey) { if (pCol->pOptions && ((SColumnOptions*)pCol->pOptions)->bPrimaryKey) {
field.flags |= COL_IS_KEY; field.flags |= COL_IS_KEY;
} }
if (field.typeMod > 0) { if (field.typeMod != 0) {
field.flags |= COL_HAS_TYPE_MOD; field.flags |= COL_HAS_TYPE_MOD;
} }
if (NULL == taosArrayPush(*pArray, &field)) { if (NULL == taosArrayPush(*pArray, &field)) {
@ -10233,9 +10233,15 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
if (NULL == pAlterReq->pFields) { if (NULL == pAlterReq->pFields) {
return terrno; return terrno;
} }
pAlterReq->pTypeMods = taosArrayInit(2, sizeof(STypeMod));
if (!pAlterReq->pTypeMods) return terrno;
STypeMod typeMod = calcTypeMod(&pStmt->dataType);
switch (pStmt->alterType) { switch (pStmt->alterType) {
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
if (NULL == taosArrayPush(pAlterReq->pTypeMods, &typeMod)) {
return terrno;
} // fall through
case TSDB_ALTER_TABLE_ADD_TAG: case TSDB_ALTER_TABLE_ADD_TAG:
case TSDB_ALTER_TABLE_DROP_TAG: case TSDB_ALTER_TABLE_DROP_TAG:
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
@ -10304,6 +10310,7 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
} }
} }
if (NULL == taosArrayPush(pAlterReq->pFields, &field)) return terrno; if (NULL == taosArrayPush(pAlterReq->pFields, &field)) return terrno;
if (NULL == taosArrayPush(pAlterReq->pTypeMods, &typeMod)) return terrno;
break; break;
} }
default: default:
@ -12133,11 +12140,16 @@ static int32_t adjustDataTypeOfProjections(STranslateContext* pCxt, const STable
} }
SSchema* pSchemas = getTableColumnSchema(pMeta); SSchema* pSchemas = getTableColumnSchema(pMeta);
const SSchemaExt* pExtSchemas = getTableColumnExtSchema(pMeta);
int32_t index = 0; int32_t index = 0;
SNode* pProj = NULL; SNode* pProj = NULL;
FOREACH(pProj, pProjections) { FOREACH(pProj, pProjections) {
SSchema* pSchema = pSchemas + index++; SSchema* pSchema = pSchemas + index++;
SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes}; SDataType dt = {.type = pSchema->type, .bytes = pSchema->bytes};
if (IS_DECIMAL_TYPE(pSchema->type)) {
STypeMod typeMod = (pExtSchemas + (index - 1))->typeMod;
extractTypeFromTypeMod(pSchema->type, typeMod, &dt.precision, &dt.scale, NULL);
}
if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) { if (!dataTypeEqual(&dt, &((SExprNode*)pProj)->resType)) {
SNode* pFunc = NULL; SNode* pFunc = NULL;
int32_t code = createCastFunc(pCxt, pProj, dt, &pFunc); int32_t code = createCastFunc(pCxt, pProj, dt, &pFunc);

View File

@ -1,4 +1,5 @@
from random import randrange from random import randrange
from re import A
import time import time
import threading import threading
import secrets import secrets
@ -9,6 +10,10 @@ from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import * from util.common import *
syntax_error = -2147473920
invalid_column = -2147473918
invalid_compress_level = -2147483084
invalid_encode_param = -2147483087
class DecimalType: class DecimalType:
def __init__(self, precision: int, scale: int): def __init__(self, precision: int, scale: int):
self.precision = precision self.precision = precision
@ -240,15 +245,10 @@ class TableInserter:
sql += f"({start_ts + i * step}" sql += f"({start_ts + i * step}"
for column in self.columns_types: for column in self.columns_types:
sql += f", {column.generate_value()}" sql += f", {column.generate_value()}"
if self.tags_types:
sql += ") tags("
for tag in self.tags_types:
sql += f"{tag.generate_value()},"
sql = sql[:-2]
sql += ")" sql += ")"
if i != rows - 1: if i != rows - 1:
sql += ", " sql += ", "
local_flush_database = i % 5000 == 0; local_flush_database = i % 5000 == 0
if len(sql) > 1000: if len(sql) > 1000:
#tdLog.debug(f"insert into with sql{sql}") #tdLog.debug(f"insert into with sql{sql}")
if flush_database and local_flush_database: if flush_database and local_flush_database:
@ -269,13 +269,18 @@ class TDTestCase:
self.ctbNum = 10 self.ctbNum = 10
self.rowsPerTbl = 10000 self.rowsPerTbl = 10000
self.duraion = '1h' self.duraion = '1h'
self.columns = [] self.norm_tb_columns = []
self.tags = [] self.tags = []
self.stable_name = "meters" self.stable_name = "meters"
self.norm_table_name = "nt" self.norm_table_name = "nt"
self.c_table_prefix = "t" self.c_table_prefix = "t"
self.db_name = "test" self.db_name = "test"
self.c_table_num = 10 self.c_table_num = 10
self.no_decimal_col_tb_name = 'tt'
self.stb_columns = []
self.stream_name = 'stream1'
self.stream_out_stb = 'stream_out_stb'
self.tsma_name = 'tsma1'
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
@ -289,7 +294,6 @@ class TDTestCase:
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s" % ( tsql.execute("create database if not exists %s vgroups %d replica %d duration %s" % (
dbName, vgroups, replica, duration)) dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s" % (dbName)) tdLog.debug("complete to create database %s" % (dbName))
return
def create_stable(self, tsql, paraDict): def create_stable(self, tsql, paraDict):
colString = tdCom.gen_column_type_str( colString = tdCom.gen_column_type_str(
@ -300,7 +304,6 @@ class TDTestCase:
paraDict["dbName"], paraDict["stbName"], colString, tagString) paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s" % (sqlString)) tdLog.debug("%s" % (sqlString))
tsql.execute(sqlString) tsql.execute(sqlString)
return
def create_ctable(self, tsql=None, dbName='dbx', stbName='stb', ctbPrefix='ctb', ctbNum=1, ctbStartIdx=0): def create_ctable(self, tsql=None, dbName='dbx', stbName='stb', ctbPrefix='ctb', ctbNum=1, ctbStartIdx=0):
for i in range(ctbNum): for i in range(ctbNum):
@ -310,7 +313,6 @@ class TDTestCase:
tdLog.debug("complete to create %d child tables by %s.%s" % tdLog.debug("complete to create %d child tables by %s.%s" %
(ctbNum, dbName, stbName)) (ctbNum, dbName, stbName))
return
def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int): def init_normal_tb(self, tsql, db_name: str, tb_name: str, rows: int, start_ts: int, ts_step: int):
sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 INT, c4 double, c5 VARCHAR(255))' % ( sql = 'CREATE TABLE %s.%s (ts timestamp, c1 INT, c2 INT, c3 INT, c4 double, c5 VARCHAR(255))' % (
@ -392,6 +394,10 @@ class TDTestCase:
self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb', self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb',
paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep']) paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep'])
def check_desc_for_one_ctb(self, ctbPrefix: str, columns: List[DataType], tags: List[DataType] = []):
ctb_idx = randrange(self.c_table_num)
return self.check_desc(f"{ctbPrefix}{ctb_idx}", columns, tags)
def check_desc(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []): def check_desc(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []):
sql = f"desc {self.db_name}.{tbname}" sql = f"desc {self.db_name}.{tbname}"
tdSql.query(sql, queryTimes=1) tdSql.query(sql, queryTimes=1)
@ -405,6 +411,8 @@ class TDTestCase:
tdLog.exit(f"check desc failed for table: {tbname} column {results[i+1][0]} encode is {results[i+1][5]}, expect {DecimalType.default_encode()}") tdLog.exit(f"check desc failed for table: {tbname} column {results[i+1][0]} encode is {results[i+1][5]}, expect {DecimalType.default_encode()}")
if results[i+1][5] != DecimalType.default_compression(): if results[i+1][5] != DecimalType.default_compression():
tdLog.exit(f"check desc failed for table: {tbname} column {results[i+1][0]} compression is {results[i+1][4]}, expect {DecimalType.default_compression()}") tdLog.exit(f"check desc failed for table: {tbname} column {results[i+1][0]} compression is {results[i+1][4]}, expect {DecimalType.default_compression()}")
if tbname == self.stable_name:
self.check_desc_for_one_ctb(self.c_table_prefix, column_types, tag_types)
def check_show_create_table(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []): def check_show_create_table(self, tbname: str, column_types: List[DataType], tag_types: List[DataType] = []):
sql = f"show create table {self.db_name}.{tbname}" sql = f"show create table {self.db_name}.{tbname}"
@ -418,11 +426,47 @@ class TDTestCase:
if result_type != column_type.get_decimal_type(): if result_type != column_type.get_decimal_type():
tdLog.exit(f"check show create table failed for: {tbname} column {i} type is {result_type}, expect {column_type.get_decimal_type()}") tdLog.exit(f"check show create table failed for: {tbname} column {i} type is {result_type}, expect {column_type.get_decimal_type()}")
decimal_idx += 1 decimal_idx += 1
def test_add_drop_columns_with_decimal(self, tbname: str, columns: List[DataType]):
is_stb = tbname == self.stable_name
## alter table add column
create_c99_sql = f'alter table {self.db_name}.{tbname} add column c99 decimal(37, 19)'
columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(37, 19))))
tdSql.execute(create_c99_sql, queryTimes=1, show=True)
self.check_desc(tbname, columns)
## alter table add column with compression
create_c100_sql = f'ALTER TABLE {self.db_name}.{tbname} ADD COLUMN c100 decimal(36, 18) COMPRESS "zstd"'
tdSql.execute(create_c100_sql, queryTimes=1, show=True)
columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(36, 18))))
self.check_desc(tbname, columns)
def test_create_decimal_column(self): ## drop non decimal column
drop_c6_sql = f'alter table {self.db_name}.{tbname} drop column c6'
tdSql.execute(drop_c6_sql, queryTimes=1, show=True)
c6 = columns.pop(5)
self.check_desc(tbname, columns)
## drop decimal column and not last column
drop_c99_sql = f'alter table {self.db_name}.{tbname} drop column c99'
tdSql.execute(drop_c99_sql, queryTimes=1, show=True)
c99 = columns.pop(len(columns) - 2)
self.check_desc(tbname, columns)
## drop decimal column and last column
drop_c100_sql = f'alter table {self.db_name}.{tbname} drop column c100'
tdSql.execute(drop_c100_sql, queryTimes=1, show=True)
c100 = columns.pop(len(columns) - 1)
self.check_desc(tbname, columns)
## create decimal back
tdSql.execute(create_c99_sql, queryTimes=1, show=True)
tdSql.execute(create_c100_sql, queryTimes=1, show=True)
columns.append(c99)
columns.append(c100)
self.check_desc(tbname, columns)
def test_decimal_column_ddl(self):
## create decimal type table, normal/super table, decimal64/decimal128 ## create decimal type table, normal/super table, decimal64/decimal128
tdLog.printNoPrefix("-------- test create decimal column") tdLog.printNoPrefix("-------- test create decimal column")
self.columns = [ self.norm_tb_columns = [
DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(10, 2))), DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(10, 2))),
DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(20, 4))), DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(20, 4))),
DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(30, 8))), DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(30, 8))),
@ -438,24 +482,23 @@ class TDTestCase:
DataType(TypeEnum.INT), DataType(TypeEnum.INT),
DataType(TypeEnum.VARCHAR, 255) DataType(TypeEnum.VARCHAR, 255)
] ]
DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.columns, self.tags).create() self.stb_columns = self.norm_tb_columns.copy()
self.check_desc("meters", self.columns, self.tags) DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.stb_columns, self.tags).create()
self.check_show_create_table("meters", self.columns, self.tags) self.check_show_create_table("meters", self.stb_columns, self.tags)
DecimalColumnTableCreater(tdSql, self.db_name, self.norm_table_name, self.columns).create() DecimalColumnTableCreater(tdSql, self.db_name, self.norm_table_name, self.norm_tb_columns).create()
self.check_desc(self.norm_table_name, self.columns) self.check_desc(self.norm_table_name, self.norm_tb_columns)
self.check_show_create_table(self.norm_table_name, self.columns) self.check_show_create_table(self.norm_table_name, self.norm_tb_columns)
## TODO add more values for all rows ## TODO add more values for all rows
tag_values = [ tag_values = [
"1", "t1" "1", "t1"
] ]
DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.columns).create_child_table(self.c_table_prefix, self.c_table_num, self.tags, tag_values) DecimalColumnTableCreater(tdSql, self.db_name, self.stable_name, self.norm_tb_columns).create_child_table(self.c_table_prefix, self.c_table_num, self.tags, tag_values)
self.check_desc("t1", self.columns, self.tags) self.check_desc("meters", self.stb_columns, self.tags)
self.check_desc("t1", self.norm_tb_columns, self.tags)
## invalid precision/scale ## invalid precision/scale
syntax_error = -2147473920
invalid_column = -2147473918
invalid_precision_scale = [("decimal(-1, 2)", syntax_error), ("decimal(39, 2)", invalid_column), ("decimal(10, -1)", syntax_error), invalid_precision_scale = [("decimal(-1, 2)", syntax_error), ("decimal(39, 2)", invalid_column), ("decimal(10, -1)", syntax_error),
("decimal(10, 39)", invalid_column), ("decimal(10, 2.5)", syntax_error), ("decimal(10.5, 2)", syntax_error), ("decimal(10, 39)", invalid_column), ("decimal(10, 2.5)", syntax_error), ("decimal(10.5, 2)", syntax_error),
("decimal(10.5, 2.5)", syntax_error), ("decimal(0, 2)", invalid_column), ("decimal(0)", invalid_column), ("decimal(10.5, 2.5)", syntax_error), ("decimal(0, 2)", invalid_column), ("decimal(0)", invalid_column),
@ -468,31 +511,28 @@ class TDTestCase:
sql = 'create stable %s.invalid_decimal_tag (ts timestamp) tags (t1 decimal(10, 2))' % (self.db_name) sql = 'create stable %s.invalid_decimal_tag (ts timestamp) tags (t1 decimal(10, 2))' % (self.db_name)
tdSql.error(sql, invalid_column) tdSql.error(sql, invalid_column)
## alter table add column ## alter table add/drop column
sql = f'alter table {self.db_name}.{self.norm_table_name} add column c99 decimal(37, 19)' self.test_add_drop_columns_with_decimal(self.norm_table_name, self.norm_tb_columns)
self.columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(37, 19)))) self.test_add_drop_columns_with_decimal(self.stable_name, self.stb_columns)
tdSql.execute(sql, queryTimes=1)
self.check_desc(self.norm_table_name, self.columns)
## alter table add column with compression
## Test metaentry compatibility problem for decimal type:w
## How to test it?
## Create table with no decimal type, the metaentries should not have extschma, and add decimal column, the metaentries should have extschema for all columns.
## After drop this decimal column, the metaentries should not have extschema for all columns.
## Test for normal table and super table
## drop index from stb ## drop index from stb
### These ops will override the previous stbobjs and meta entries, so test it ### These ops will override the previous stbobjs and meta entries, so test it
## TODO test encode and compress for decimal type ## TODO test encode and compress for decimal type
sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "simple8b" COMPRESS "zstd"'
tdSql.error(sql, invalid_encode_param)
sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "delta-i" COMPRESS "zstd"'
tdSql.error(sql, invalid_encode_param)
sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "delta-d" COMPRESS "zstd"'
tdSql.error(sql, invalid_encode_param)
sql = f'ALTER TABLE {self.db_name}.{self.norm_table_name} ADD COLUMN c101 decimal(37, 19) ENCODE "bit-packing" COMPRESS "zstd"'
tdSql.error(sql, invalid_encode_param)
def test_insert_decimal_values(self): def test_insert_decimal_values(self):
for i in range(self.c_table_num): for i in range(self.c_table_num):
pass TableInserter(tdSql, self.db_name, f"{self.c_table_prefix}{i}", self.stb_columns, self.tags).insert(1000, 1537146000000, 500)
#TableInserter(tdSql, self.db_name, f"{self.c_table_prefix}{i}", self.columns, self.tags).insert(1, 1537146000000, 500)
TableInserter(tdSql, self.db_name, self.norm_table_name, self.columns).insert(10000, 1537146000000, 500, flush_database=True) TableInserter(tdSql, self.db_name, self.norm_table_name, self.norm_tb_columns).insert(10000, 1537146000000, 500, flush_database=True)
## insert null/None for decimal type ## insert null/None for decimal type
@ -508,18 +548,48 @@ class TDTestCase:
DataType(TypeEnum.FLOAT), DataType(TypeEnum.FLOAT),
DataType(TypeEnum.VARCHAR, 255), DataType(TypeEnum.VARCHAR, 255),
] ]
DecimalColumnTableCreater(tdSql, self.db_name, "tt", columns, []).create() DecimalColumnTableCreater(tdSql, self.db_name, self.no_decimal_col_tb_name, columns, []).create()
TableInserter(tdSql, self.db_name, 'tt', columns).insert(10000, 1537146000000, 500, flush_database=True) TableInserter(tdSql, self.db_name, self.no_decimal_col_tb_name, columns).insert(10000, 1537146000000, 500, flush_database=True)
## TODO wjm test non support decimal version upgrade to decimal support version, and add decimal column ## TODO wjm test non support decimal version upgrade to decimal support version, and add decimal column
## Test metaentry compatibility problem for decimal type
## How to test it?
## Create table with no decimal type, the metaentries should not have extschma, and add decimal column, the metaentries should have extschema for all columns.
sql = f'ALTER TABLE {self.db_name}.{self.no_decimal_col_tb_name} ADD COLUMN c200 decimal(37, 19)'
tdSql.execute(sql, queryTimes=1) ## now meta entry has ext schemas
columns.append(DataType(TypeEnum.DECIMAL, type_mod=DataType.get_decimal_type_mod(DecimalType(37, 19))))
self.check_desc(self.no_decimal_col_tb_name, columns)
## After drop this only decimal column, the metaentries should not have extschema for all columns.
sql = f'ALTER TABLE {self.db_name}.{self.no_decimal_col_tb_name} DROP COLUMN c200'
tdSql.execute(sql, queryTimes=1) ## now meta entry has no ext schemas
columns.pop(len(columns) - 1)
self.check_desc(self.no_decimal_col_tb_name, columns)
sql = f'ALTER TABLE {self.db_name}.{self.no_decimal_col_tb_name} ADD COLUMN c200 int'
tdSql.execute(sql, queryTimes=1) ## meta entry has no ext schemas
columns.append(DataType(TypeEnum.INT))
self.check_desc(self.no_decimal_col_tb_name, columns)
self.test_add_drop_columns_with_decimal(self.no_decimal_col_tb_name, columns)
def test_decimal_ddl(self): def test_decimal_ddl(self):
tdSql.execute("create database test", queryTimes=1) tdSql.execute("create database test", queryTimes=1)
self.test_create_decimal_column() self.test_decimal_column_ddl()
## TODO test decimal column for tmq
def test_decimal_and_stream(self):
create_stream = f'CREATE STREAM {self.stream_name} FILL_HISTORY 1 INTO {self.db_name}.{self.stream_out_stb} AS SELECT _wstart, count(c1), avg(c2), sum(c3) FROM {self.db_name}.{self.stable_name} INTERVAL(10s)'
tdSql.execute(create_stream, queryTimes=1, show=True)
def test_decimal_and_tsma(self):
pass
def run(self): def run(self):
self.test_decimal_ddl() self.test_decimal_ddl()
self.no_decimal_table_test() self.no_decimal_table_test()
self.test_insert_decimal_values() self.test_insert_decimal_values()
self.test_decimal_and_stream()
self.test_decimal_and_tsma()
def stop(self): def stop(self):
tdSql.close() tdSql.close()