diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d224f9a411..6e83c27a42 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -219,6 +219,7 @@ typedef struct STqReader { SSDataBlock *pResBlock; int64_t lastTs; bool hasPrimaryKey; + SExtSchema *extSchema; } STqReader; STqReader *tqReaderOpen(SVnode *pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 635f9946d5..00c08e1664 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -167,8 +167,10 @@ int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tb int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); -SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); int64_t metaGetTableCreateTime(SMeta* pMeta, tb_uid_t uid, int lock); +SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t* createTime, SExtSchema** extSchema); +SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME); +void metaFreeSExtSchema(SExtSchema *p); int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index 608d36721c..1425f3be28 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -70,6 +70,32 @@ static int32_t metaDecodeExtSchemas(SDecoder* pDecoder, SMetaEntry* pME) { return 0; } +SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME) { + const SSchemaWrapper *pSchWrapper = NULL; + bool hasTypeMods = false; + if (pME->type == TSDB_SUPER_TABLE) { + pSchWrapper = &pME->stbEntry.schemaRow; + } else if (pME->type == TSDB_NORMAL_TABLE) { + pSchWrapper = &pME->ntbEntry.schemaRow; + } else { + return NULL; + } + hasTypeMods = schemasHasTypeMod(pSchWrapper->pSchema, pSchWrapper->nCols); + + if (hasTypeMods) { + SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * pSchWrapper->nCols); + memcpy(ret, pME->pExtSchemas, pSchWrapper->nCols * sizeof(SExtSchema)); + return ret; + } + return NULL; +} + +void metaFreeSExtSchema(SExtSchema *p) { + if (p) { + taosMemoryFreeClear(p); + } +} + int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) { const SColCmprWrapper *pw = &pME->colCmpr; TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols)); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index bf576f1e8b..950e7e5acc 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -378,7 +378,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) { return 0; } -SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime, SExtSchema** extSchema) { void *pData = NULL; int nData = 0; int64_t version; @@ -409,6 +409,7 @@ _query: if (me.type == TSDB_SUPER_TABLE) { if (sver == -1 || sver == me.stbEntry.schemaRow.version) { pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow); + if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me); tDecoderClear(&dc); goto _exit; } @@ -419,6 +420,7 @@ _query: } else { if (sver == -1 || sver == me.ntbEntry.schemaRow.version) { pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); + if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me); tDecoderClear(&dc); goto _exit; } @@ -435,6 +437,7 @@ _query: goto _err; } pSchema = tCloneSSchemaWrapper(&schema); + if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me); tDecoderClear(&dc); _exit: @@ -664,7 +667,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { STSchema *pTSchema = NULL; SSchemaWrapper *pSW = NULL; - pSW = metaGetTableSchema(pMeta, uid, sver, lock); + pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL, NULL); if (!pSW) return NULL; pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 64693274f4..e5a45dc3fa 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -593,7 +593,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) { void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) { bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1); + SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL, NULL); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 3495aa20f5..12efe09d15 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -283,7 +283,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { return; } bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1); + SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, NULL); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } @@ -336,6 +336,7 @@ void tqReaderClose(STqReader* pReader) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); } + metaFreeSExtSchema(pReader->extSchema); if (pReader->pColIdList) { taosArrayDestroy(pReader->pColIdList); } @@ -597,7 +598,7 @@ END: return code; } -int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) { +int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSchema) { if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -620,6 +621,9 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap pDst->pSchema[j++] = pSrc->pSchema[i]; SColumnInfoData colInfo = createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId); + if (extSchema != NULL) { + decimalFromTypeMod(extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale); + } code = blockDataAppendColInfo(pBlock, &colInfo); if (code != 0) { return code; @@ -653,6 +657,9 @@ static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, c SSchema* pColSchema = &pSchema->pSchema[i]; SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); + if (pReader->extSchema != NULL) { + decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale); + } int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { blockDataFreeRes(pBlock); @@ -741,8 +748,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) || (pReader->cachedSchemaVer != sversion)) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); - - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + metaFreeSExtSchema(pReader->extSchema); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL, &pReader->extSchema); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table", @@ -901,7 +908,7 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); TQ_NULL_GO_TO_END(pSW); - TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned)); + TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned, pReader->extSchema)); tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, (int32_t)taosArrayGetSize(block->pDataBlock)); @@ -1119,7 +1126,8 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block pReader->lastBlkUid = uid; tDeleteSchemaWrapper(pReader->pSchemaWrapper); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + metaFreeSExtSchema(pReader->extSchema); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime, &pReader->extSchema); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 01253bd7c0..0e4ead5e27 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -760,7 +760,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { } int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { - SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0); + SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL, NULL); if (pSW) { *num = pSW->nCols; tDeleteSchemaWrapper(pSW); diff --git a/tests/system-test/7-tmq/raw_block_interface_test.py b/tests/system-test/7-tmq/raw_block_interface_test.py index 1c9798421d..65d6747894 100644 --- a/tests/system-test/7-tmq/raw_block_interface_test.py +++ b/tests/system-test/7-tmq/raw_block_interface_test.py @@ -25,6 +25,7 @@ class TDTestCase: tdSql.query("select * from d1") tdSql.checkRows(1) tdSql.checkData(0, 1, 120) + tdSql.checkData(0, 4, 2.32) tdSql.query("select * from d2") tdSql.checkRows(1) diff --git a/tests/system-test/7-tmq/tmq_c_test.py b/tests/system-test/7-tmq/tmq_c_test.py index 0552a700dd..c04af3e3e7 100644 --- a/tests/system-test/7-tmq/tmq_c_test.py +++ b/tests/system-test/7-tmq/tmq_c_test.py @@ -23,6 +23,13 @@ class TDTestCase: tdSql.init(conn.cursor()) #tdSql.init(conn.cursor(), logSql) # output sql.txt file + def checkData(self): + tdSql.execute('use db_taosx') + tdSql.query("select * from ct0") + tdSql.checkRows(2) + tdSql.checkData(0, 4, 23.23) + + return def run(self): buildPath = tdCom.getBuildPath() cmdStr = '%s/build/bin/tmq_write_raw_test'%(buildPath) @@ -41,6 +48,7 @@ class TDTestCase: tdLog.info(cmdStr) os.system(cmdStr) + self.checkData() return def stop(self): diff --git a/utils/test/c/tmq_write_raw_test.c b/utils/test/c/tmq_write_raw_test.c index e8d4c6abae..ab2e3db796 100644 --- a/utils/test/c/tmq_write_raw_test.c +++ b/utils/test/c/tmq_write_raw_test.c @@ -42,6 +42,23 @@ static void msg_process(TAOS_RES* msg) { printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); + if (strcmp(tmq_get_db_name(msg), "db_query") == 0){ + TAOS_ROW row = NULL; + int32_t cnt = 0; + while ((row = taos_fetch_row(msg))) { + + int numFields = taos_num_fields(msg); + TAOS_FIELD *fields = taos_fetch_fields(msg); + + for (int i = 0; i < numFields; ++i) { + if (IS_DECIMAL_TYPE(fields[i].type)) { + ASSERT(strcmp(row[i], "3.122") == 0); + } + } + } + return; + } + TAOS* pConn = use_db(); if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) { char* result = tmq_get_json_meta(msg); @@ -62,7 +79,7 @@ static void msg_process(TAOS_RES* msg) { int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { pRes = taos_query(pConn, - "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16), c4 decimal(10,3)) tags(t1 int, t3 " "nchar(8), t4 bool)"); if (taos_errno(pRes) != 0) { printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); @@ -77,7 +94,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')"); + pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a', 3.2)"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); return -1; @@ -98,7 +115,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b')"); + pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b', 4.32)"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -112,14 +129,14 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a')"); + pRes = taos_query(pConn, "insert into ct0 using st1 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a', 23.23)"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b')"); + pRes = taos_query(pConn, "insert into ct1 using st1(t1) tags(2000) values(1626006833600, 3, 4, 'b', 43.53)"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -128,8 +145,8 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { pRes = taos_query( pConn, - "insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c') ct1 using st1(t1) tags(2000) values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, " - "'ddd') ct0 using st1 tags(1000, \"ttt\", true) values(1626006833603, 4, 3, 'hwj') ct1 using st1(t1) tags(2000) values(now+5s, 23, 32, 's21ds')"); + "insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c', 43.53) ct1 using st1(t1) tags(2000) values(1626006833601, 2, 3, 'sds', 43.53) (1626006833602, 4, 5, " + "'ddd', 43.53) ct0 using st1 tags(1000, \"ttt\", true) values(1626006833603, 4, 3, 'hwj', 43.53) ct1 using st1(t1) tags(2000) values(now+5s, 23, 32, 's21ds', 43.53)"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -196,6 +213,36 @@ int32_t init_env() { buildDatabase(pConn, pRes); + pRes = taos_query(pConn, "drop database if exists db_query"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists db_query vgroups 1 wal_retention_period 3600"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + // create and insert another stable + pRes = taos_query(pConn, + "create stable if not exists db_query.st11 (ts timestamp, c1 int, c2 float, c3 binary(16), c4 decimal(9,3)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into db_query.ct10 using db_query.st11 tags(1000, \"ttt\", true) values(1626006833400, 1, 2, 'a', 3.122)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); taos_close(pConn); return 0; } @@ -215,6 +262,13 @@ int32_t create_topic() { } taos_free_result(pRes); + pRes = taos_query(pConn, "create topic topic_query as select * from db_query.st11"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topic_query, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + taos_close(pConn); return 0; } @@ -254,6 +308,7 @@ tmq_t* build_consumer() { tmq_list_t* build_topic_list() { tmq_list_t* topic_list = tmq_list_new(); tmq_list_append(topic_list, "topic_db"); + tmq_list_append(topic_list, "topic_query"); return topic_list; }