From 6dc98650d5871bd3c55b7c504b9c22d35a0390ff Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 28 Feb 2025 15:39:12 +0800 Subject: [PATCH] feat:remove add schemaExt to SMqDataRsp --- include/common/tmsg.h | 1 - include/libs/executor/executor.h | 1 - include/libs/executor/storageapi.h | 2 - source/client/src/clientImpl.c | 14 +++++- source/client/src/clientTmq.c | 4 +- source/common/src/msg/tmsg.c | 53 ---------------------- source/dnode/vnode/src/inc/tq.h | 10 ---- source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/meta/metaEntry.c | 11 ----- source/dnode/vnode/src/meta/metaSnapshot.c | 9 ---- source/dnode/vnode/src/tq/tqRead.c | 22 ++++----- source/dnode/vnode/src/tq/tqScan.c | 21 ++------- source/dnode/vnode/src/tq/tqUtil.c | 3 -- source/libs/executor/inc/querytask.h | 1 - source/libs/executor/src/executor.c | 7 --- source/libs/executor/src/querytask.c | 1 - 16 files changed, 26 insertions(+), 135 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5f79d58482..bdcbe4262b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4341,7 +4341,6 @@ typedef struct { }; void* data; //for free in client, only effected if type is data or metadata. raw data not effected bool blockDataElementFree; // if true, free blockDataElement in blockData,(true in server, false in client) - SArray* blockSchemaExt; // save decimal info } SMqDataRsp; int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pObj); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 821581538e..e2bb6eefbf 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -215,7 +215,6 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); -const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo); const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index e3f1781b29..8d207c9866 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -133,14 +133,12 @@ typedef struct SMetaTableInfo { int64_t suid; int64_t uid; SSchemaWrapper* schema; - SExtSchema* pExtSchemas; char tbName[TSDB_TABLE_NAME_LEN]; } SMetaTableInfo; static FORCE_INLINE void destroyMetaTableInfo(SMetaTableInfo* mtInfo){ if (mtInfo == NULL) return; tDeleteSchemaWrapper(mtInfo->schema); - taosMemoryFreeClear(mtInfo->pExtSchemas); } typedef struct SSnapContext { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index abf6d28959..0a33f495d8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2452,8 +2452,18 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) { uint64_t groupId = *(uint64_t*)p; p += sizeof(uint64_t); - // type+bytes - p += (sizeof(int32_t) + sizeof(int8_t)) * pResultInfo->numOfCols; + // check fields + for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { + int8_t type = *(int8_t*)p; + p += sizeof(int8_t); + + int32_t bytes = *(int32_t*)p; + p += sizeof(int32_t); + + if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) { + extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale); + } + } int32_t* colLength = (int32_t*)p; p += sizeof(int32_t) * pResultInfo->numOfCols; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d2809da1c1..6881775dc5 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -3011,10 +3011,8 @@ int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pRes if (data->withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(data->blockSchema, pRspObj->resIter); - SExtSchema* pSWExt = (data->blockSchemaExt == NULL) ? NULL : - (SExtSchema*)taosArrayGetP(data->blockSchemaExt, pRspObj->resIter); if (pSW) { - TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, pSWExt)); + TAOS_CHECK_RETURN(setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols, NULL)); } } diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 8cbd1609c9..d7c11f3421 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11674,51 +11674,11 @@ _exit: return code; } -static int32_t tEncodeMqDataRspSchemaExt(SEncoder *pEncoder, const SMqDataRsp *pRsp){ - int32_t code = 0; - int32_t lino = 0; - if (!pRsp->withSchema){ - return 0; - } - for (int32_t i = 0; i < pRsp->blockNum; i++) { - SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); - TSDB_CHECK_NULL(pSW, code, lino, _exit, terrno); - SExtSchema *pSWExt = (SExtSchema *)taosArrayGetP(pRsp->blockSchemaExt, i); - if (pSWExt != NULL){ - TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t *)pSWExt, sizeof(SExtSchema) * pSW->nCols)); - } - } - _exit: - return code; -} - -static int32_t tDecodeMqDataRspSchemaExt(SDecoder *pDecoder, SMqDataRsp *pRsp) { - int32_t code = 0; - int32_t lino = 0; - if (!pRsp->withSchema) { - return 0; - } - if ((pRsp->blockSchemaExt = taosArrayInit(pRsp->blockNum, sizeof(void *))) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < pRsp->blockNum; i++) { - void * data = NULL; - uint32_t bLen = 0; - TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t **)&data, &bLen)); - if (taosArrayPush(pRsp->blockSchemaExt, &data) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - _exit: - return code; -} - int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t code = 0; int32_t lino = 0; TAOS_CHECK_EXIT(tEncodeMqDataRspCommon(pEncoder, pRsp)); TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->sleepTime)); - TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp)); _exit: return code; @@ -11802,9 +11762,6 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (!tDecodeIsEnd(pDecoder)) { TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime)); } - if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_RETURN(tDecodeMqDataRspSchemaExt(pDecoder, pRsp)); - } return 0; } @@ -11831,12 +11788,6 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); pRsp->blockSchema = NULL; - if (pRsp->blockDataElementFree){ - taosArrayDestroyP(pRsp->blockSchemaExt, NULL); - } else { - taosArrayDestroy(pRsp->blockSchemaExt); - } - pRsp->blockSchemaExt = NULL; taosArrayDestroyP(pRsp->blockTbName, NULL); pRsp->blockTbName = NULL; tOffsetDestroy(&pRsp->reqOffset); @@ -11859,7 +11810,6 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen)); } } - TAOS_CHECK_EXIT(tEncodeMqDataRspSchemaExt(pEncoder, pRsp)); _exit: return code; @@ -11891,9 +11841,6 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } } } - if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_EXIT(tDecodeMqDataRspSchemaExt(pDecoder, pRsp)); - } _exit: return code; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 6e7eb41382..be0df15447 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -108,16 +108,6 @@ struct STQ { SStreamMeta* pStreamMeta; }; -typedef struct { - SSchemaWrapper *pSchemaWrapper; - SExtSchema *extSchema; -}TQSchema; - -static FORCE_INLINE void freeTqSchema(TQSchema* pSchema) { - tDeleteSchemaWrapper(pSchema->pSchemaWrapper); - taosMemoryFreeClear(pSchema->extSchema); -} - int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 419b0b3883..3d856eeb60 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -170,7 +170,6 @@ int metaUpdateChangeTimeWithLock(SMeta* pMeta, tb_uid_t uid, int64_t SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, SExtSchema** extSchema); int64_t metaGetTableCreateTime(SMeta *pMeta, tb_uid_t uid, int lock); SExtSchema* metaGetSExtSchema(const SMetaEntry *pME); -SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols); int32_t metaGetTbTSchemaNotNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); int32_t metaGetTbTSchemaMaybeNull(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock, STSchema** ppTSchema); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index f386f0ef2c..da0b354ff7 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -92,17 +92,6 @@ SExtSchema* metaGetSExtSchema(const SMetaEntry *pME) { return NULL; } -SExtSchema* metaCloneSExtSchema(const SExtSchema *src, int32_t nCols) { - if (src == NULL || nCols <= 0) { - return NULL; - } - SExtSchema* ret = taosMemoryMalloc(sizeof(SExtSchema) * nCols); - if (ret != NULL){ - memcpy(ret, src, nCols * sizeof(SExtSchema)); - } - return ret; -} - int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) { const SColCmprWrapper *pw = &pME->colCmpr; TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols)); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index e9f4c5c380..ce934694e0 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -256,7 +256,6 @@ typedef struct STableInfoForChildTable { char* tableName; SSchemaWrapper* schemaRow; SSchemaWrapper* tagRow; - SExtSchema* pExtSchemas; } STableInfoForChildTable; static void destroySTableInfoForChildTable(void* data) { @@ -264,7 +263,6 @@ static void destroySTableInfoForChildTable(void* data) { taosMemoryFree(pData->tableName); tDeleteSchemaWrapper(pData->schemaRow); tDeleteSchemaWrapper(pData->tagRow); - taosMemoryFree(pData->pExtSchemas); } static int32_t MoveToSnapShotVersion(SSnapContext* ctx) { @@ -338,11 +336,6 @@ static int32_t saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInf code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - dataTmp.pExtSchemas = metaGetSExtSchema(me); - if (dataTmp.pExtSchemas == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto END; - } code = taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable)); if (code != 0) { goto END; @@ -811,11 +804,9 @@ int32_t getMetaTableInfoFromSnapshot(SSnapContext* ctx, SMetaTableInfo* result) } result->suid = me.ctbEntry.suid; result->schema = tCloneSSchemaWrapper(data->schemaRow); - result->pExtSchemas = metaCloneSExtSchema(data->pExtSchemas, data->schemaRow->nCols); } else if (ctx->subType == TOPIC_SUB_TYPE__DB && me.type == TSDB_NORMAL_TABLE) { result->suid = 0; result->schema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow); - result->pExtSchemas = metaGetSExtSchema(&me); } else { metaDebug("tmqsnap get uid continue"); tDecoderClear(&dc); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 67e1be8b68..8dccd2af94 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -598,7 +598,7 @@ END: return code; } -int32_t tqMaskBlock(SSchemaWrapper* pDst, SExtSchema* extDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) { +int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask, SExtSchema* extSrc) { if (pDst == NULL || pBlock == NULL || pSrc == NULL || mask == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -622,7 +622,6 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SExtSchema* extDst, SSDataBlock* pBloc SColumnInfoData colInfo = createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId); if (extSrc != NULL) { - extDst[j++] = extSrc[i]; decimalFromTypeMod(extSrc[i].typeMod, &colInfo.info.precision, &colInfo.info.scale); } code = blockDataAppendColInfo(pBlock, &colInfo); @@ -658,7 +657,7 @@ static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, c SSchema* pColSchema = &pSchema->pSchema[i]; SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - if (pReader->extSchema != NULL) { + if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) { decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale); } int32_t code = blockDataAppendColInfo(pBlock, &colInfo); @@ -688,6 +687,9 @@ static int32_t buildResSDataBlock(STqReader* pReader, SSchemaWrapper* pSchema, c j++; } else { SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); + if (IS_DECIMAL_TYPE(pColSchema->type) && pReader->extSchema != NULL) { + decimalFromTypeMod(pReader->extSchema[i].typeMod, &colInfo.info.precision, &colInfo.info.scale); + } int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { return -1; @@ -895,7 +897,6 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, int32_t* lastRow) { int32_t code = 0; SSchemaWrapper* pSW = NULL; - SExtSchema* pSWExt = NULL; SSDataBlock* block = NULL; if (taosArrayGetSize(blocks) > 0) { SSDataBlock* pLastBlock = taosArrayGetLast(blocks); @@ -909,10 +910,8 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); TQ_NULL_GO_TO_END(pSW); - pSWExt = taosMemoryCalloc(1, sizeof(SExtSchema)); - TQ_NULL_GO_TO_END(pSWExt); - TQ_ERR_GO_TO_END(tqMaskBlock(pSW, pSWExt, block, pReader->pSchemaWrapper, assigned, pReader->extSchema)); + TQ_ERR_GO_TO_END(tqMaskBlock(pSW, block, pReader->pSchemaWrapper, assigned, pReader->extSchema)); tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, (int32_t)taosArrayGetSize(block->pDataBlock)); @@ -920,10 +919,8 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, block->info.version = pReader->msg.ver; TQ_ERR_GO_TO_END(blockDataEnsureCapacity(block, numOfRows - curRow)); TQ_NULL_GO_TO_END(taosArrayPush(blocks, block)); - TQSchema schema = {pSW, pSWExt}; - TQ_NULL_GO_TO_END(taosArrayPush(schemas, &schema)); + TQ_NULL_GO_TO_END(taosArrayPush(schemas, &pSW)); pSW = NULL; - pSWExt = NULL; taosMemoryFreeClear(block); @@ -934,7 +931,6 @@ END: tDeleteSchemaWrapper(pSW); blockDataFreeRes(block); taosMemoryFree(block); - taosMemoryFree(pSWExt); return code; } static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) { @@ -1149,12 +1145,10 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* block return code; } } else if (rawList != NULL) { - TQSchema schema = {pReader->pSchemaWrapper, pReader->extSchema}; - if (taosArrayPush(schemas, &schema) == NULL){ + if (taosArrayPush(schemas, &pReader->pSchemaWrapper) == NULL){ return terrno; } pReader->pSchemaWrapper = NULL; - pReader->extSchema = NULL; return 0; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 932b4bb83c..9b242e4c84 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -238,7 +238,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat int32_t lino = 0; char* tbName = NULL; SSchemaWrapper* pSW = NULL; - SExtSchema* pSWExt = NULL; const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); @@ -267,10 +266,6 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat TSDB_CHECK_NULL(pSW, code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno); pSW = NULL; - pSWExt = metaCloneSExtSchema(qExtractSchemaExtFromTask(task), pSW->nCols); - TSDB_CHECK_NULL(pSWExt, code, lino, END, terrno); - TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchemaExt, &pSWExt), code, lino, END, terrno); - pSWExt = NULL; } code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), @@ -319,7 +314,6 @@ END: tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code)); } tDeleteSchemaWrapper(pSW); - taosMemoryFree(pSWExt); taosMemoryFree(tbName); return code; } @@ -336,7 +330,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno); - pSchemas = taosArrayInit(0, sizeof(TQSchema)); + pSchemas = taosArrayInit(0, sizeof(void*)); TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno); SSubmitTbData* pSubmitTbData = NULL; @@ -381,17 +375,12 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int *totalRows += pBlock->info.rows; } - TQSchema* schema = (TQSchema*)taosArrayGet(pSchemas, i); - if (taosArrayPush(pRsp->blockSchema, &schema->pSchemaWrapper) == NULL){ + void** pSW = taosArrayGet(pSchemas, i); + if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){ tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); continue; } - if (taosArrayPush(pRsp->blockSchemaExt, &schema->extSchema) == NULL){ - tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); - continue; - } - schema->pSchemaWrapper = NULL; - schema->extSchema = NULL; + *pSW = NULL; pRsp->blockNum++; } tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows); @@ -400,7 +389,7 @@ END: tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code)); } taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); - taosArrayDestroyP(pSchemas, (FDelete)freeTqSchema); + taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper); } static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest, SArray** rawList){ diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f0bb70e666..4811836a6c 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -75,8 +75,6 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno); - pRsp->blockSchemaExt = taosArrayInit(0, sizeof(void*)); - TSDB_CHECK_NULL(pRsp->blockSchemaExt, code, lino, END, terrno); END: if (code != 0){ tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code)); @@ -84,7 +82,6 @@ END: taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockTbName); taosArrayDestroy(pRsp->blockSchema); - taosArrayDestroy(pRsp->blockSchemaExt); } return code; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 740090ba18..7e621e3df5 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -64,7 +64,6 @@ typedef struct { int8_t sourceExcluded; int64_t snapshotVer; SSchemaWrapper* schema; - SExtSchema* pExtSchemas; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; int8_t recoverScanFinished; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c10ec14974..1c492142f3 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1306,11 +1306,6 @@ const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.schema; } -const SExtSchema* qExtractSchemaExtFromTask(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.pExtSchemas; -} - const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return pTaskInfo->streamInfo.tbName; @@ -1601,9 +1596,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN); // pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid; tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema); - taosMemoryFreeClear(pTaskInfo->streamInfo.pExtSchemas); pTaskInfo->streamInfo.schema = mtInfo.schema; - pTaskInfo->streamInfo.pExtSchemas = mtInfo.pExtSchemas; qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index c2cf1b414d..20c80df4fa 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -261,7 +261,6 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSchemaWrapper(pStreamInfo->schema); - taosMemoryFreeClear(pStreamInfo->pExtSchemas); tOffsetDestroy(&pStreamInfo->currentOffset); tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema); taosMemoryFree(pStreamInfo->stbFullName);