From d5d1dd9127e04dc08dbe66666eb718b789fa2a62 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 17 Jan 2024 20:08:26 +0800 Subject: [PATCH] opti:[TD-28118] raw block data for tmq --- include/common/tmsg.h | 8 ++++++ source/client/inc/clientInt.h | 2 ++ source/client/src/clientImpl.c | 14 +++++++---- source/client/src/clientRawBlockWrite.c | 22 ++++++++++++++--- source/client/src/clientTmq.c | 33 ++++++++++++++++--------- source/common/src/tdatablock.c | 4 +-- source/dnode/vnode/src/tq/tqScan.c | 9 +++---- 7 files changed, 65 insertions(+), 27 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c314d82036..2ee48a18e0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1975,6 +1975,14 @@ typedef struct { char data[]; } SRetrieveTableRsp; +typedef struct { + int64_t version; + int64_t numOfRows; + int8_t compressed; + int8_t precision; + char data[]; +} SRetrieveTableRspForTmq; + typedef struct { int64_t handle; int64_t useconds; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index a6d5039be7..5fb789f0db 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -298,6 +298,8 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo); void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, bool freeAfterUse); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, + bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); void doFreeReqResultInfo(SReqResultInfo* pResInfo); int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d4f89d6b54..9800d233e9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1795,6 +1795,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) { static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) { char* p = (char*)pResultInfo->pData; + int32_t blockVersion = *(int32_t*)p; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // length | @@ -1810,7 +1811,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i char* pStart = p + len; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = htonl(colLength[i]); + int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { int32_t* offset = (int32_t*)pStart; @@ -1873,6 +1874,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int tscDebug("start to convert form json format string"); char* p = (char*)pResultInfo->pData; + int32_t blockVersion = *(int32_t*)p; int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows); if (dataLen <= 0) { return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1908,8 +1910,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = htonl(colLength[i]); - int32_t colLen1 = htonl(colLength1[i]); + int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; if (ASSERT(colLen < dataLen)) { tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1968,7 +1970,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; - colLength1[i] = htonl(len); + colLength1[i] = (blockVersion == 1) ? htonl(len) : len; } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -2057,7 +2059,9 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - colLength[i] = htonl(colLength[i]); + if(blockVersion == 1){ + colLength[i] = htonl(colLength[i]); + } if (colLength[i] >= dataLen) { tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 1f9d3c6d8c..12ee67abbb 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1553,6 +1553,18 @@ end: return code; } +static void* getRawDataFromRes(void *pRetrieve){ + void* rawData = NULL; + // deal with compatibility + if(*(int64_t*)pRetrieve == 0){ + rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + }else if(*(int64_t*)pRetrieve == 1){ + rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + } + ASSERT(rawData != NULL); + return rawData; +} + static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { if(taos == NULL || data == NULL){ terrno = TSDB_CODE_INVALID_PARA; @@ -1607,7 +1619,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); while (++rspObj.resIter < rspObj.rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { goto end; } @@ -1653,7 +1665,8 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { fields[i].bytes = pSW->pSchema[i].bytes; tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true); + void* rawData = getRawDataFromRes(pRetrieve); + code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -1737,7 +1750,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) uDebug(LOG_ID_TAG" write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); + void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { goto end; } @@ -1824,7 +1837,8 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) fields[i].bytes = pSW->pSchema[i].bytes; tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true); + void* rawData = getRawDataFromRes(pRetrieve); + code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { goto end; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 28e269ee10..d98c3340c1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1577,16 +1577,21 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - if (!pWrapper->dataRsp.withSchema) { - setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); - } - // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i); + SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); int64_t rows = htobe64(pRetrieve->numOfRows); pVg->numOfRows += rows; (*numOfRows) += rows; + + if (!pRspObj->rsp.withSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable + pRspObj->rsp.withSchema = true; + pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void *)); + SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); + if(schema){ + taosArrayPush(pRspObj->rsp.blockSchema, &schema); + } + } } return pRspObj; @@ -1603,13 +1608,10 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClie pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - if (!pWrapper->taosxRsp.withSchema) { - setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); - } // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i); + SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); int64_t rows = htobe64(pRetrieve->numOfRows); pVg->numOfRows += rows; (*numOfRows) += rows; @@ -2548,7 +2550,7 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { pRspObj->resIter++; if (pRspObj->resIter < pRspObj->rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + SRetrieveTableRspForTmq* pRetrieveTmq = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); if (pRspObj->rsp.withSchema) { SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); @@ -2559,7 +2561,16 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { taosMemoryFreeClear(pRspObj->resInfo.convertJson); } - setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false); + pRspObj->resInfo.pData = (void*)pRetrieveTmq->data; + pRspObj->resInfo.numOfRows = htobe64(pRetrieveTmq->numOfRows); + pRspObj->resInfo.current = 0; + pRspObj->resInfo.precision = pRetrieveTmq->precision; + + // TODO handle the compressed case + pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows; + setResultDataPtr(&pRspObj->resInfo, pRspObj->resInfo.fields, pRspObj->resInfo.numOfCols, pRspObj->resInfo.numOfRows, + convertUcs4); + return &pRspObj->resInfo; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f0d1630480..0fb066d521 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2196,7 +2196,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { // todo extract method int32_t* version = (int32_t*)data; - *version = 1; + *version = 2; data += sizeof(int32_t); int32_t* actualLen = (int32_t*)data; @@ -2277,7 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } - colSizes[col] = htonl(colSizes[col]); +// colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 01866ef893..5432637482 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -16,21 +16,20 @@ #include "tq.h" int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = 0; + SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; + pRetrieve->version = 1; pRetrieve->precision = precision; pRetrieve->compressed = 0; - pRetrieve->completed = 1; pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); - actualLen += sizeof(SRetrieveTableRsp); + actualLen += sizeof(SRetrieveTableRspForTmq); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf);