From d5d1dd9127e04dc08dbe66666eb718b789fa2a62 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 17 Jan 2024 20:08:26 +0800 Subject: [PATCH 1/6] opti:[TD-28118] raw block data for tmq --- include/common/tmsg.h | 8 ++++++ source/client/inc/clientInt.h | 2 ++ source/client/src/clientImpl.c | 14 +++++++---- source/client/src/clientRawBlockWrite.c | 22 ++++++++++++++--- source/client/src/clientTmq.c | 33 ++++++++++++++++--------- source/common/src/tdatablock.c | 4 +-- source/dnode/vnode/src/tq/tqScan.c | 9 +++---- 7 files changed, 65 insertions(+), 27 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c314d82036..2ee48a18e0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1975,6 +1975,14 @@ typedef struct { char data[]; } SRetrieveTableRsp; +typedef struct { + int64_t version; + int64_t numOfRows; + int8_t compressed; + int8_t precision; + char data[]; +} SRetrieveTableRspForTmq; + typedef struct { int64_t handle; int64_t useconds; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a6d5039be7..5fb789f0db 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -298,6 +298,8 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo); void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, bool freeAfterUse); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, + bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); void doFreeReqResultInfo(SReqResultInfo* pResInfo); int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d4f89d6b54..9800d233e9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1795,6 +1795,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) { static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) { char* p = (char*)pResultInfo->pData; + int32_t blockVersion = *(int32_t*)p; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // length | @@ -1810,7 +1811,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i char* pStart = p + len; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = htonl(colLength[i]); + int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { int32_t* offset = (int32_t*)pStart; @@ -1873,6 +1874,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int tscDebug("start to convert form json format string"); char* p = (char*)pResultInfo->pData; + int32_t blockVersion = *(int32_t*)p; int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows); if (dataLen <= 0) { return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1908,8 +1910,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = htonl(colLength[i]); - int32_t colLen1 = htonl(colLength1[i]); + int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; if (ASSERT(colLen < dataLen)) { tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1968,7 +1970,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; - colLength1[i] = htonl(len); + colLength1[i] = (blockVersion == 1) ? htonl(len) : len; } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -2057,7 +2059,9 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - colLength[i] = htonl(colLength[i]); + if(blockVersion == 1){ + colLength[i] = htonl(colLength[i]); + } if (colLength[i] >= dataLen) { tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 1f9d3c6d8c..12ee67abbb 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1553,6 +1553,18 @@ end: return code; } +static void* getRawDataFromRes(void *pRetrieve){ + void* rawData = NULL; + // deal with compatibility + if(*(int64_t*)pRetrieve == 0){ + rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + }else if(*(int64_t*)pRetrieve == 1){ + rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + } + ASSERT(rawData != NULL); + return rawData; +} + static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { if(taos == NULL || data == NULL){ terrno = TSDB_CODE_INVALID_PARA; @@ -1607,7 +1619,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); while (++rspObj.resIter < rspObj.rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { goto end; } @@ -1653,7 +1665,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { fields[i].bytes = pSW->pSchema[i].bytes; tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true); + void* rawData = getRawDataFromRes(pRetrieve); + code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -1737,7 +1750,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) uDebug(LOG_ID_TAG" write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { goto end; } @@ -1824,7 +1837,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) fields[i].bytes = pSW->pSchema[i].bytes; tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true); + void* rawData = getRawDataFromRes(pRetrieve); + code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { goto end; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 28e269ee10..d98c3340c1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1577,16 +1577,21 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - if (!pWrapper->dataRsp.withSchema) { - setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); - } - // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i); + SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); int64_t rows = htobe64(pRetrieve->numOfRows); pVg->numOfRows += rows; (*numOfRows) += rows; + + if (!pRspObj->rsp.withSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable + pRspObj->rsp.withSchema = true; + pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void *)); + SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); + if(schema){ + taosArrayPush(pRspObj->rsp.blockSchema, &schema); + } + } } return pRspObj; @@ -1603,13 +1608,10 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClie pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - if (!pWrapper->taosxRsp.withSchema) { - setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); - } // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i); + SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); int64_t rows = htobe64(pRetrieve->numOfRows); pVg->numOfRows += rows; (*numOfRows) += rows; @@ -2548,7 +2550,7 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { pRspObj->resIter++; if (pRspObj->resIter < pRspObj->rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + SRetrieveTableRspForTmq* pRetrieveTmq = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); if (pRspObj->rsp.withSchema) { SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); @@ -2559,7 +2561,16 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { taosMemoryFreeClear(pRspObj->resInfo.convertJson); } - setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false); + pRspObj->resInfo.pData = (void*)pRetrieveTmq->data; + pRspObj->resInfo.numOfRows = htobe64(pRetrieveTmq->numOfRows); + pRspObj->resInfo.current = 0; + pRspObj->resInfo.precision = pRetrieveTmq->precision; + + // TODO handle the compressed case + pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows; + setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols, pRspObj->resInfo.numOfRows, + convertUcs4); + return &pRspObj->resInfo; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f0d1630480..0fb066d521 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2196,7 +2196,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { // todo extract method int32_t* version = (int32_t*)data; - *version = 1; + *version = 2; data += sizeof(int32_t); int32_t* actualLen = (int32_t*)data; @@ -2277,7 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } - colSizes[col] = htonl(colSizes[col]); +// colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 01866ef893..5432637482 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -16,21 +16,20 @@ #include "tq.h" int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = 0; + SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; + pRetrieve->version = 1; pRetrieve->precision = precision; pRetrieve->compressed = 0; - pRetrieve->completed = 1; pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); - actualLen += sizeof(SRetrieveTableRsp); + actualLen += sizeof(SRetrieveTableRspForTmq); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); From 4227340d9e69d0a34b3249d93af2324d5166189d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 18 Jan 2024 09:11:58 +0800 Subject: [PATCH 2/6] opti:[TD-28118] raw block data for tmq --- source/common/src/tdatablock.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0fb066d521..5382259899 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2293,7 +2293,6 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { int32_t version = *(int32_t*)pStart; pStart += sizeof(int32_t); - ASSERT(version == 1); // total length sizeof(int32_t) int32_t dataLen = *(int32_t*)pStart; @@ -2339,7 +2338,9 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - colLen[i] = htonl(colLen[i]); + if(version == 1){ + colLen[i] = htonl(colLen[i]); + } ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); From 366de880a85331d3c3028ab4eaeed8e9b8b4fe08 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 18 Jan 2024 10:48:42 +0800 Subject: [PATCH 3/6] opti:[TD-28118] raw block data for tmq --- source/libs/parser/src/parInsertUtil.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index a924ed68b0..9a9e73876d 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -662,6 +662,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate char* p = (char*)data; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // length | + int32_t version = *(int32_t*)data; p += sizeof(int32_t); p += sizeof(int32_t); @@ -717,7 +718,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength) { + if (needChangeLength && version == 1) { pStart += htonl(colLength[j]); } else { pStart += colLength[j]; @@ -748,7 +749,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength) { + if (needChangeLength && version == 1) { pStart += htonl(colLength[i]); } else { pStart += colLength[i]; From 1b85adaa3ae3d7889c3704d3407970f6038d27b7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 18 Jan 2024 14:17:08 +0800 Subject: [PATCH 4/6] opti:[TD-28118] raw block data for tmq --- source/client/src/clientTmq.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d98c3340c1..69681b9ae0 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1577,16 +1577,19 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - // extract the rows in this data packet + bool needTransformSchema = !pRspObj->rsp.withSchema; + if (!pRspObj->rsp.withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable + pRspObj->rsp.withSchema = true; + pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void*)); + } + // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); int64_t rows = htobe64(pRetrieve->numOfRows); pVg->numOfRows += rows; (*numOfRows) += rows; - if (!pRspObj->rsp.withSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable - pRspObj->rsp.withSchema = true; - pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void *)); + if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); if(schema){ taosArrayPush(pRspObj->rsp.blockSchema, &schema); From f95d5e4f62037e3ea2489cf87103e13d2fb99d87 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 19 Jan 2024 15:28:23 +0800 Subject: [PATCH 5/6] opti:[TD-28118] raw block data for tmq --- include/libs/parser/parser.h | 2 +- source/client/src/clientRawBlockWrite.c | 3 +-- source/libs/parser/src/parInsertUtil.c | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 177607b178..bb206b5a02 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -150,7 +150,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int32_t msgBufLen); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); -int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength); +int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 12ee67abbb..db8de44f1c 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1838,12 +1838,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } void* rawData = getRawDataFromRes(pRetrieve); - code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, fields, pSW->nCols, true); + code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { goto end; } - pCreateReqDst = NULL; taosMemoryFreeClear(pTableMeta); } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 9a9e73876d..6b389250bf 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -640,12 +640,12 @@ static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) { return false; } -int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, +int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq** pCreateTb, TAOS_FIELD* tFields, int numFields, bool needChangeLength) { void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); STableDataCxt* pTableCxt = NULL; int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, - sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true, false); + sizeof(pTableMeta->uid), pTableMeta, pCreateTb, &pTableCxt, true, false); if (ret != TSDB_CODE_SUCCESS) { uError("insGetTableDataCxt error"); goto end; From 0e1ef540c4a633393ecd3ff1de6493a81c8ca4dc Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 19 Jan 2024 17:09:02 +0800 Subject: [PATCH 6/6] opti:[TD-28118] raw block data for tmq --- source/libs/parser/src/parInsertUtil.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 6b389250bf..6b655bfae6 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -247,13 +247,13 @@ static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreat if (NULL == pTableCxt->pData) { code = TSDB_CODE_OUT_OF_MEMORY; } else { - pTableCxt->pData->flags = NULL != *pCreateTbReq ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0; + pTableCxt->pData->flags = (pCreateTbReq != NULL && NULL != *pCreateTbReq) ? SUBMIT_REQ_AUTO_CREATE_TABLE : 0; pTableCxt->pData->flags |= colMode ? SUBMIT_REQ_COLUMN_DATA_FORMAT : 0; pTableCxt->pData->suid = pTableMeta->suid; pTableCxt->pData->uid = pTableMeta->uid; pTableCxt->pData->sver = pTableMeta->sversion; - pTableCxt->pData->pCreateTbReq = *pCreateTbReq; - *pCreateTbReq = NULL; + pTableCxt->pData->pCreateTbReq = pCreateTbReq != NULL ? *pCreateTbReq : NULL; + if(pCreateTbReq != NULL) *pCreateTbReq = NULL; if (pTableCxt->pData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { pTableCxt->pData->aCol = taosArrayInit(128, sizeof(SColData)); if (NULL == pTableCxt->pData->aCol) {