From 353b3ea99b88ee02a1f07d9e36b65f66afddcc74 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 28 Feb 2025 14:13:06 +0800 Subject: [PATCH] feat:add schemaExt to SMqDataRsp --- include/common/tmsg.h | 4 +- include/libs/executor/executor.h | 1 + include/libs/executor/storageapi.h | 7 +++ include/libs/parser/parser.h | 2 +- source/client/src/clientRawBlockWrite.c | 3 +- source/client/src/clientTmq.c | 5 +- source/common/src/msg/tmsg.c | 67 ++++++++++++++++++++-- source/dnode/vnode/src/inc/tq.h | 10 ++++ source/dnode/vnode/src/inc/vnodeInt.h | 8 +-- source/dnode/vnode/src/meta/metaEntry.c | 17 ++++-- source/dnode/vnode/src/meta/metaQuery.c | 10 ++-- source/dnode/vnode/src/meta/metaSnapshot.c | 11 +++- source/dnode/vnode/src/tq/tqRead.c | 40 ++++++++----- source/dnode/vnode/src/tq/tqScan.c | 23 ++++++-- source/dnode/vnode/src/tq/tqUtil.c | 4 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- source/libs/executor/inc/querytask.h | 1 + source/libs/executor/src/executor.c | 20 +++++-- source/libs/executor/src/querytask.c | 1 + source/libs/executor/src/scanoperator.c | 4 +- 20 files changed, 183 insertions(+), 57 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b3406558f6..5f79d58482 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -901,6 +901,7 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp } static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { + if (pSW == NULL) {return TSDB_CODE_INVALID_PARA;} TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->nCols)); TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->version)); for (int32_t i = 0; i < pSW->nCols; i++) { @@ -910,6 +911,7 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch } static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { + if (pSW == NULL) {return TSDB_CODE_INVALID_PARA;} TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->nCols)); TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->version)); @@ -4339,7 +4341,7 @@ typedef struct { }; void* data; //for free in client, only effected if type is data or metadata. raw data not effected bool blockDataElementFree; // if true, free blockDataElement in blockData,(true in server, false in client) - + SArray* blockSchemaExt; // save decimal info } SMqDataRsp; int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e2bb6eefbf..821581538e 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -215,6 +215,7 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); +const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo); const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 4e5cb2cccb..e3f1781b29 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -133,9 +133,16 @@ typedef struct SMetaTableInfo { int64_t suid; int64_t uid; SSchemaWrapper* schema; + SExtSchema* pExtSchemas; char tbName[TSDB_TABLE_NAME_LEN]; } SMetaTableInfo; +static FORCE_INLINE void destroyMetaTableInfo(SMetaTableInfo* mtInfo){ + if (mtInfo == NULL) return; + tDeleteSchemaWrapper(mtInfo->schema); + taosMemoryFreeClear(mtInfo->pExtSchemas); +} + typedef struct SSnapContext { struct SMeta* pMeta; int64_t snapVersion; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 4bdc2c4740..6c0f0c57cb 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -187,7 +187,7 @@ int32_t smlBuildOutputRaw(SQuery* handle, SHashObj* pVgHash); int rawBlockBindRawData(SHashObj* pVgroupHash, SArray* pVgroupList, STableMeta* pTableMeta, void* data); int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, void* fields, int numFields, bool needChangeLength, char* errstr, int32_t errstrLen, bool raw); -int32_t checkSchema(SSchema* pColSchema, int8_t* fields, char* errstr, int32_t errstrLen); +int32_t checkSchema(SSchema* pColSchema, SSchemaExt* pColExtSchema, int8_t* fields, char* errstr, int32_t errstrLen); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); int32_t serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap, SArray** pOut); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d7d5a21f24..55f6f85347 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1975,10 +1975,11 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe int j = 0; for (; j < pTableMeta->tableInfo.numOfColumns; j++) { SSchema* pColSchema = &pTableMeta->schema[j]; + SSchemaExt* pColExtSchema = &pTableMeta->schemaExt[j]; char* fieldName = pSW->pSchema[i].name; if (strcmp(pColSchema->name, fieldName) == 0) { - if (checkSchema(pColSchema, fields, NULL, 0) != 0){ + if (checkSchema(pColSchema, pColExtSchema, fields, NULL, 0) != 0){ return true; } break; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 86b111abd4..d2809da1c1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -3011,9 +3011,10 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes if (data->withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter); + SExtSchema* pSWExt = (data->blockSchemaExt == NULL) ? NULL : + (SExtSchema*)taosArrayGetP(data->blockSchemaExt, pRspObj->resIter); if (pSW) { - // TODO wjm tmq support ext schema - TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL)); + TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, pSWExt)); } } diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index ee1f4d43ef..8cbd1609c9 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11674,11 +11674,54 @@ _exit: return code; } -int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { - TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime)); +static int32_t tEncodeMqDataRspSchemaExt(SEncoder *pEncoder, const SMqDataRsp *pRsp){ + int32_t code = 0; + int32_t lino = 0; + if (!pRsp->withSchema){ + return 0; + } + for (int32_t i = 0; i < pRsp->blockNum; i++) { + SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); + TSDB_CHECK_NULL(pSW, code, lino, _exit, terrno); + SExtSchema *pSWExt = (SExtSchema *)taosArrayGetP(pRsp->blockSchemaExt, i); + if (pSWExt != NULL){ + TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t *)pSWExt, sizeof(SExtSchema) * pSW->nCols)); + } + } + _exit: + return code; +} - return 0; +static int32_t tDecodeMqDataRspSchemaExt(SDecoder *pDecoder, SMqDataRsp *pRsp) { + int32_t code = 0; + int32_t lino = 0; + if (!pRsp->withSchema) { + return 0; + } + if ((pRsp->blockSchemaExt = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < pRsp->blockNum; i++) { + void * data = NULL; + uint32_t bLen = 0; + TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t **)&data, &bLen)); + if (taosArrayPush(pRsp->blockSchemaExt, &data) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + _exit: + return code; +} + +int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { + int32_t code = 0; + int32_t lino = 0; + TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->sleepTime)); + TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp)); + + _exit: + return code; } int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { @@ -11759,6 +11802,9 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (!tDecodeIsEnd(pDecoder)) { TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime)); } + if (!tDecodeIsEnd(pDecoder)) { + TAOS_CHECK_RETURN(tDecodeMqDataRspSchemaExt(pDecoder, pRsp)); + } return 0; } @@ -11785,6 +11831,12 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); pRsp->blockSchema = NULL; + if (pRsp->blockDataElementFree){ + taosArrayDestroyP(pRsp->blockSchemaExt, NULL); + } else { + taosArrayDestroy(pRsp->blockSchemaExt); + } + pRsp->blockSchemaExt = NULL; taosArrayDestroyP(pRsp->blockTbName, NULL); pRsp->blockTbName = NULL; tOffsetDestroy(&pRsp->reqOffset); @@ -11796,7 +11848,7 @@ void tDeleteMqDataRsp(SMqDataRsp *rsp) { tDeleteMqDataRspCommon(rsp); } int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp)); TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->createTableNum)); @@ -11807,6 +11859,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen)); } } + TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp)); _exit: return code; @@ -11838,7 +11891,9 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } } } - + if (!tDecodeIsEnd(pDecoder)) { + TAOS_CHECK_EXIT(tDecodeMqDataRspSchemaExt(pDecoder, pRsp)); + } _exit: return code; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index be0df15447..6e7eb41382 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -108,6 +108,16 @@ struct STQ { SStreamMeta* pStreamMeta; }; +typedef struct { + SSchemaWrapper *pSchemaWrapper; + SExtSchema *extSchema; +}TQSchema; + +static FORCE_INLINE void freeTqSchema(TQSchema* pSchema) { + tDeleteSchemaWrapper(pSchema->pSchemaWrapper); + taosMemoryFreeClear(pSchema->extSchema); +} + int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 00c08e1664..419b0b3883 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -167,10 +167,10 @@ int32_t metaDropMultipleTables(SMeta* pMeta, int64_t version, SArray* tb int metaTtlFindExpired(SMeta* pMeta, int64_t timePointMs, SArray* tbUids, int32_t ttlDropMaxCount); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs); -int64_t metaGetTableCreateTime(SMeta* pMeta, tb_uid_t uid, int lock); -SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t* createTime, SExtSchema** extSchema); -SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME); -void metaFreeSExtSchema(SExtSchema *p); +SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, SExtSchema** extSchema); +int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock); +SExtSchema* metaGetSExtSchema(const SMetaEntry *pME); +SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols); int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index 1425f3be28..f386f0ef2c 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -70,7 +70,7 @@ static int32_t metaDecodeExtSchemas(SDecoder* pDecoder, SMetaEntry* pME) { return 0; } -SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME) { +SExtSchema* metaGetSExtSchema(const SMetaEntry *pME) { const SSchemaWrapper *pSchWrapper = NULL; bool hasTypeMods = false; if (pME->type == TSDB_SUPER_TABLE) { @@ -84,16 +84,23 @@ SExtSchema* metaCloneSExtSchema(const SMetaEntry *pME) { if (hasTypeMods) { SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * pSchWrapper->nCols); - memcpy(ret, pME->pExtSchemas, pSchWrapper->nCols * sizeof(SExtSchema)); + if (ret != NULL){ + memcpy(ret, pME->pExtSchemas, pSchWrapper->nCols * sizeof(SExtSchema)); + } return ret; } return NULL; } -void metaFreeSExtSchema(SExtSchema *p) { - if (p) { - taosMemoryFreeClear(p); +SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols) { + if (src == NULL || nCols <= 0) { + return NULL; } + SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * nCols); + if (ret != NULL){ + memcpy(ret, src, nCols * sizeof(SExtSchema)); + } + return ret; } int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) { diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 950e7e5acc..d813280937 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -378,7 +378,7 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType) { return 0; } -SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, int64_t *createTime, SExtSchema** extSchema) { +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock, SExtSchema** extSchema) { void *pData = NULL; int nData = 0; int64_t version; @@ -409,7 +409,7 @@ _query: if (me.type == TSDB_SUPER_TABLE) { if (sver == -1 || sver == me.stbEntry.schemaRow.version) { pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow); - if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me); + if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me); tDecoderClear(&dc); goto _exit; } @@ -420,7 +420,7 @@ _query: } else { if (sver == -1 || sver == me.ntbEntry.schemaRow.version) { pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); - if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me); + if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me); tDecoderClear(&dc); goto _exit; } @@ -437,7 +437,7 @@ _query: goto _err; } pSchema = tCloneSSchemaWrapper(&schema); - if (extSchema != NULL) *extSchema = metaCloneSExtSchema(&me); + if (extSchema != NULL) *extSchema = metaGetSExtSchema(&me); tDecoderClear(&dc); _exit: @@ -667,7 +667,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) { STSchema *pTSchema = NULL; SSchemaWrapper *pSW = NULL; - pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL, NULL); + pSW = metaGetTableSchema(pMeta, uid, sver, lock, NULL); if (!pSW) return NULL; pTSchema = tBuildTSchema(pSW->pSchema, pSW->nCols, pSW->version); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index e5a45dc3fa..e9f4c5c380 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -256,6 +256,7 @@ typedef struct STableInfoForChildTable { char* tableName; SSchemaWrapper* schemaRow; SSchemaWrapper* tagRow; + SExtSchema* pExtSchemas; } STableInfoForChildTable; static void destroySTableInfoForChildTable(void* data) { @@ -263,6 +264,7 @@ static void destroySTableInfoForChildTable(void* data) { taosMemoryFree(pData->tableName); tDeleteSchemaWrapper(pData->schemaRow); tDeleteSchemaWrapper(pData->tagRow); + taosMemoryFree(pData->pExtSchemas); } static int32_t MoveToSnapShotVersion(SSnapContext* ctx) { @@ -336,6 +338,11 @@ static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInf code = TSDB_CODE_OUT_OF_MEMORY; goto END; } + dataTmp.pExtSchemas = metaGetSExtSchema(me); + if (dataTmp.pExtSchemas == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); if (code != 0) { goto END; @@ -593,7 +600,7 @@ int32_t setForSnapShot(SSnapContext* ctx, int64_t uid) { void taosXSetTablePrimaryKey(SSnapContext* ctx, int64_t uid) { bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL, NULL); + SSchemaWrapper* schema = metaGetTableSchema(ctx->pMeta, uid, -1, 1, NULL); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } @@ -804,9 +811,11 @@ int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) } result->suid = me.ctbEntry.suid; result->schema = tCloneSSchemaWrapper(data->schemaRow); + result->pExtSchemas = metaCloneSExtSchema(data->pExtSchemas, data->schemaRow->nCols); } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) { result->suid = 0; result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); + result->pExtSchemas = metaGetSExtSchema(&me); } else { metaDebug("tmqsnap get uid continue"); tDecoderClear(&dc); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 12efe09d15..67e1be8b68 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -283,7 +283,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { return; } bool ret = false; - SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL, NULL); + SSchemaWrapper* schema = metaGetTableSchema(pReader->pVnodeMeta, uid, -1, 1, NULL); if (schema && schema->nCols >= 2 && schema->pSchema[1].flags & COL_IS_KEY) { ret = true; } @@ -336,7 +336,7 @@ void tqReaderClose(STqReader* pReader) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); } - metaFreeSExtSchema(pReader->extSchema); + taosMemoryFree(pReader->extSchema); if (pReader->pColIdList) { taosArrayDestroy(pReader->pColIdList); } @@ -598,7 +598,7 @@ END: return code; } -int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSchema) { +int32_t tqMaskBlock(SSchemaWrapper* pDst, SExtSchema* extDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) { if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -621,8 +621,9 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap pDst->pSchema[j++] = pSrc->pSchema[i]; SColumnInfoData colInfo = createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId); - if (extSchema != NULL) { - decimalFromTypeMod(extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale); + if (extSrc != NULL) { + extDst[j++] = extSrc[i]; + decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale); } code = blockDataAppendColInfo(pBlock, &colInfo); if (code != 0) { @@ -748,8 +749,8 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* if ((suid != 0 && pReader->cachedSchemaSuid != suid) || (suid == 0 && pReader->cachedSchemaUid != uid) || (pReader->cachedSchemaVer != sversion)) { tDeleteSchemaWrapper(pReader->pSchemaWrapper); - metaFreeSExtSchema(pReader->extSchema); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, NULL, &pReader->extSchema); + taosMemoryFree(pReader->extSchema); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", uid:%" PRId64 "version %d, possibly dropped table", @@ -890,10 +891,11 @@ END: } static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas, - SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow, + char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) { int32_t code = 0; SSchemaWrapper* pSW = NULL; + SExtSchema* pSWExt = NULL; SSDataBlock* block = NULL; if (taosArrayGetSize(blocks) > 0) { SSDataBlock* pLastBlock = taosArrayGetLast(blocks); @@ -907,8 +909,10 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); TQ_NULL_GO_TO_END(pSW); + pSWExt = taosMemoryCalloc(1, sizeof(SExtSchema)); + TQ_NULL_GO_TO_END(pSWExt); - TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pSchemaWrapper, assigned, pReader->extSchema)); + TQ_ERR_GO_TO_END(tqMaskBlock(pSW, pSWExt, block, pReader->pSchemaWrapper, assigned, pReader->extSchema)); tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, (int32_t)taosArrayGetSize(block->pDataBlock)); @@ -916,8 +920,11 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, block->info.version = pReader->msg.ver; TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow)); TQ_NULL_GO_TO_END(taosArrayPush(blocks, block)); - TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW)); + TQSchema schema = {pSW, pSWExt}; + TQ_NULL_GO_TO_END(taosArrayPush(schemas, &schema)); pSW = NULL; + pSWExt = NULL; + taosMemoryFreeClear(block); END: @@ -927,6 +934,7 @@ END: tDeleteSchemaWrapper(pSW); blockDataFreeRes(block); taosMemoryFree(block); + taosMemoryFree(pSWExt); return code; } static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) { @@ -956,7 +964,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData } if (buildNew) { - TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows, + TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow)); } @@ -1020,7 +1028,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra } if (buildNew) { - TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, pSchemaWrapper, assigned, numOfRows, + TQ_ERR_GO_TO_END(processBuildNew(pReader, pSubmitTbData, blocks, schemas, assigned, numOfRows, curRow, &lastRow)); } @@ -1126,8 +1134,8 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block pReader->lastBlkUid = uid; tDeleteSchemaWrapper(pReader->pSchemaWrapper); - metaFreeSExtSchema(pReader->extSchema); - pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, createTime, &pReader->extSchema); + taosMemoryFree(pReader->extSchema); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1, &pReader->extSchema); if (pReader->pSchemaWrapper == NULL) { tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); @@ -1141,10 +1149,12 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block return code; } } else if (rawList != NULL) { - if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){ + TQSchema schema = {pReader->pSchemaWrapper, pReader->extSchema}; + if (taosArrayPush(schemas, &schema) == NULL){ return terrno; } pReader->pSchemaWrapper = NULL; + pReader->extSchema = NULL; return 0; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 549a47d006..932b4bb83c 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -238,6 +238,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat int32_t lino = 0; char* tbName = NULL; SSchemaWrapper* pSW = NULL; + SExtSchema* pSWExt = NULL; const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); @@ -266,6 +267,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat TSDB_CHECK_NULL(pSW, code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno); pSW = NULL; + pSWExt = metaCloneSExtSchema(qExtractSchemaExtFromTask(task), pSW->nCols); + TSDB_CHECK_NULL(pSWExt, code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchemaExt, &pSWExt), code, lino, END, terrno); + pSWExt = NULL; } code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), @@ -313,7 +318,8 @@ END: if (code != 0){ tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code)); } - taosMemoryFree(pSW); + tDeleteSchemaWrapper(pSW); + taosMemoryFree(pSWExt); taosMemoryFree(tbName); return code; } @@ -330,7 +336,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno); - pSchemas = taosArrayInit(0, sizeof(void*)); + pSchemas = taosArrayInit(0, sizeof(TQSchema)); TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno); SSubmitTbData* pSubmitTbData = NULL; @@ -375,12 +381,17 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int *totalRows += pBlock->info.rows; } - void** pSW = taosArrayGet(pSchemas, i); - if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){ + TQSchema* schema = (TQSchema*)taosArrayGet(pSchemas, i); + if (taosArrayPush(pRsp->blockSchema, &schema->pSchemaWrapper) == NULL){ tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); continue; } - *pSW = NULL; + if (taosArrayPush(pRsp->blockSchemaExt, &schema->extSchema) == NULL){ + tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); + continue; + } + schema->pSchemaWrapper = NULL; + schema->extSchema = NULL; pRsp->blockNum++; } tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows); @@ -389,7 +400,7 @@ END: tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code)); } taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); - taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper); + taosArrayDestroyP(pSchemas, (FDelete)freeTqSchema); } static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){ diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index de21f6eb0b..f0bb70e666 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -75,7 +75,8 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno); - + pRsp->blockSchemaExt = taosArrayInit(0, sizeof(void*)); + TSDB_CHECK_NULL(pRsp->blockSchemaExt, code, lino, END, terrno); END: if (code != 0){ tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code)); @@ -83,6 +84,7 @@ END: taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockTbName); taosArrayDestroy(pRsp->blockSchema); + taosArrayDestroy(pRsp->blockSchemaExt); } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0e4ead5e27..8ab244e448 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -760,7 +760,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) { } int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) { - SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL, NULL); + SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL); if (pSW) { *num = pSW->nCols; tDeleteSchemaWrapper(pSW); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 7e621e3df5..740090ba18 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -64,6 +64,7 @@ typedef struct { int8_t sourceExcluded; int64_t snapshotVer; SSchemaWrapper* schema; + SExtSchema* pExtSchemas; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; int8_t recoverScanFinished; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7f4335282f..c10ec14974 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1306,6 +1306,11 @@ const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.schema; } +const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + return pTaskInfo->streamInfo.pExtSchemas; +} + const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return pTaskInfo->streamInfo.tbName; @@ -1536,6 +1541,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SMetaTableInfo mtInfo = {0}; code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo); if (code != 0) { + destroyMetaTableInfo(&mtInfo); return code; } pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader); @@ -1545,7 +1551,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tableListClear(pTableListInfo); if (mtInfo.uid == 0) { - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); goto end; // no data } @@ -1553,7 +1559,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); return code; } if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) { @@ -1564,7 +1570,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0); if (code != TSDB_CODE_SUCCESS) { - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); return code; } @@ -1572,14 +1578,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); if (!pList) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); return code; } int32_t size = 0; code = tableListGetSize(pTableListInfo, &size); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); return code; } @@ -1587,7 +1593,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT NULL, (void**)&pInfo->dataReader, NULL, NULL); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); return code; } @@ -1595,7 +1601,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN); // pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid; tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema); + taosMemoryFreeClear(pTaskInfo->streamInfo.pExtSchemas); pTaskInfo->streamInfo.schema = mtInfo.schema; + pTaskInfo->streamInfo.pExtSchemas = mtInfo.pExtSchemas; qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 20c80df4fa..c2cf1b414d 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -261,6 +261,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); + taosMemoryFreeClear(pStreamInfo->pExtSchemas); tOffsetDestroy(&pStreamInfo->currentOffset); tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema); taosMemoryFree(pStreamInfo->stbFullName); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5e81923db1..797f74a188 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4150,7 +4150,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { code = pAPI->snapshotFn.getMetaTableInfoFromSnapshot(pInfo->sContext, &mtInfo); QUERY_CHECK_CODE(code, lino, _end); if (code != 0) { - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); QUERY_CHECK_CODE(code, lino, _end); } STqOffsetVal offset = {0}; @@ -4162,7 +4162,7 @@ static int32_t doRawScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { tqOffsetResetToData(&offset, mtInfo.uid, INT64_MIN, val); qDebug("tmqsnap change get data uid:%" PRId64 "", mtInfo.uid); } - tDeleteSchemaWrapper(mtInfo.schema); + destroyMetaTableInfo(&mtInfo); code = qStreamPrepareScan(pTaskInfo, &offset, pInfo->sContext->subType); QUERY_CHECK_CODE(code, lino, _end); (*ppRes) = NULL;