From 4037a7a1a5070666b5bb312e995a10ce90b3b6bb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 22 Feb 2024 15:44:50 +0800 Subject: [PATCH 1/2] fix:change datablock to old version for compatibility --- include/common/tdatablock.h | 5 ++++- source/client/src/clientImpl.c | 10 +++++----- source/client/src/clientMsgHandler.c | 4 ++-- source/common/src/tdatablock.c | 10 ++++++---- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mnode/impl/src/mndShow.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 2 +- source/dnode/vnode/src/tq/tqScan.c | 2 +- source/libs/command/src/command.c | 2 +- source/libs/command/src/explain.c | 2 +- source/libs/executor/src/dataDispatcher.c | 2 +- source/libs/parser/src/parInsertUtil.c | 4 ++-- source/libs/stream/src/streamDispatch.c | 4 ++-- 13 files changed, 28 insertions(+), 23 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index bcb40f4175..e305f4c2ec 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo { SColumnInfoData* pColData; } SBlockOrderInfo; +#define BLOCK_VERSION_1 1 +#define BLOCK_VERSION_2 2 + #define NBIT (3u) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) @@ -253,7 +256,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); int32_t blockGetEncodeSize(const SSDataBlock* pBlock); -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion); const char* blockDecode(SSDataBlock* pBlock, const char* pData); // for debug diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9800d233e9..a8ebef763a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1811,7 +1811,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i char* pStart = p + len; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { int32_t* offset = (int32_t*)pStart; @@ -1910,8 +1910,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; - int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i]; if (ASSERT(colLen < dataLen)) { tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1970,7 +1970,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; - colLength1[i] = (blockVersion == 1) ? htonl(len) : len; + colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len; } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -2059,7 +2059,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - if(blockVersion == 1){ + if(blockVersion == BLOCK_VERSION_1){ colLength[i] = htonl(colLength[i]); } if (colLength[i] >= dataLen) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index e0cedb9924..3aa46d135e 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -499,7 +499,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); - int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); + int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS, BLOCK_VERSION_1); blockDataDestroy(pBlock); if (len != rspSize - sizeof(SRetrieveTableRsp)) { @@ -611,7 +611,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); - int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS); + int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS, BLOCK_VERSION_1); blockDataDestroy(pBlock); if (len != rspSize - sizeof(SRetrieveTableRsp)) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5382259899..c6f004f83b 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2191,12 +2191,12 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha return TSDB_CODE_SUCCESS; } -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion) { int32_t dataLen = 0; // todo extract method int32_t* version = (int32_t*)data; - *version = 2; + *version = bVersion; data += sizeof(int32_t); int32_t* actualLen = (int32_t*)data; @@ -2277,7 +2277,9 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } -// colSizes[col] = htonl(colSizes[col]); + if(bVersion == BLOCK_VERSION_1){ + colSizes[col] = htonl(colSizes[col]); + } // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); } @@ -2338,7 +2340,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - if(version == 1){ + if(version == BLOCK_VERSION_1){ colLen[i] = htonl(colLen[i]); } ASSERT(colLen[i] >= 0); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 14853009e0..b36dc71322 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -401,7 +401,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { pStart += sizeof(SSysTableSchema); } - int32_t len = blockEncode(pBlock, pStart, numOfCols); + int32_t len = blockEncode(pBlock, pStart, numOfCols, BLOCK_VERSION_1); pRsp->numOfRows = htonl(pBlock->info.rows); pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 8e7e72aa0e..e46c057129 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -328,7 +328,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { pStart += sizeof(SSysTableSchema); } - int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); + int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1); } pRsp->numOfRows = htonl(rowsRead); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1ddd2f34e6..ed1192c490 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3150,7 +3150,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t // pStart += sizeof(SSysTableSchema); // } // -// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); +// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1); // } // // pRsp->numOfRows = htonl(rowsRead); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 5432637482..4e4e1481f9 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -28,7 +28,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t pRetrieve->compressed = 0; pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_2); actualLen += sizeof(SRetrieveTableRspForTmq); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 6bae0e1022..e74c735497 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -40,7 +40,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(numOfCols); - int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); + int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols, BLOCK_VERSION_1); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 66b50bcb47..d2d14bb9f3 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1817,7 +1817,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { rsp->completed = 1; rsp->numOfRows = htobe64((int64_t)rowNum); - int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); + int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock), BLOCK_VERSION_1); rsp->compLen = htonl(len); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index abe566473f..fb57d6b531 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->dataLen = 0; pBuf->useSize = sizeof(SDataCacheEntry); - pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); + pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols, BLOCK_VERSION_1); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 6b655bfae6..514f2ca64e 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -718,7 +718,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[j]); } else { pStart += colLength[j]; @@ -749,7 +749,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[i]); } else { pStart += colLength[i]; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b51845d152..080c2c9556 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -188,7 +188,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->version = htobe64(pBlock->info.version); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1); SStreamRetrieveReq req = { .streamId = pTask->id.streamId, @@ -772,7 +772,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1); actualLen += sizeof(SRetrieveTableRsp); ASSERT(actualLen <= dataStrLen); taosArrayPush(pReq->dataLen, &actualLen); From 604ae220f306d5a794d4082560aa3e2367f1aaf8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 22 Feb 2024 17:48:33 +0800 Subject: [PATCH 2/2] fix:change datablock to old version for compatibility --- include/common/tdatablock.h | 2 +- source/client/src/clientMsgHandler.c | 4 +- source/client/src/clientTmq.c | 67 ++++++++++++++------- source/common/src/tdatablock.c | 12 ++-- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mnode/impl/src/mndShow.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 2 +- source/dnode/vnode/src/tq/tqScan.c | 2 +- source/libs/command/src/command.c | 2 +- source/libs/command/src/explain.c | 2 +- source/libs/executor/src/dataDispatcher.c | 2 +- source/libs/stream/src/streamDispatch.c | 4 +- 12 files changed, 62 insertions(+), 41 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index e305f4c2ec..292e9a3181 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -256,7 +256,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); int32_t blockGetEncodeSize(const SSDataBlock* pBlock); -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion); +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); const char* blockDecode(SSDataBlock* pBlock, const char* pData); // for debug diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 3aa46d135e..e0cedb9924 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -499,7 +499,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); - int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS, BLOCK_VERSION_1); + int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); blockDataDestroy(pBlock); if (len != rspSize - sizeof(SRetrieveTableRsp)) { @@ -611,7 +611,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); - int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS, BLOCK_VERSION_1); + int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS); blockDataDestroy(pBlock); if (len != rspSize - sizeof(SRetrieveTableRsp)) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 69681b9ae0..ebd4e1c7b6 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1562,17 +1562,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } -SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { - SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); - pRspObj->resType = RES_TYPE__TMQ; +void changeByteEndian(char* pData){ + char* p = pData; + + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | + // version: + int32_t blockVersion = *(int32_t*)p; + ASSERT(blockVersion == BLOCK_VERSION_1); + *(int32_t*)p = BLOCK_VERSION_2; + + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(int32_t); + int32_t cols = *(int32_t*)p; + p += sizeof(int32_t); + p += sizeof(int32_t); + p += sizeof(uint64_t); + // check fields + p += cols * (sizeof(int8_t) + sizeof(int32_t)); + + int32_t* colLength = (int32_t*)p; + for (int32_t i = 0; i < cols; ++i) { + colLength[i] = htonl(colLength[i]); + } +} + +static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) { (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; - memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; @@ -1584,11 +1606,21 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, } // extract the rows in this data packet for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); - int64_t rows = htobe64(pRetrieve->numOfRows); + void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i); + void* rawData = NULL; + int64_t rows = 0; + // deal with compatibility + if(*(int64_t*)pRetrieve == 0){ + rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); + }else if(*(int64_t*)pRetrieve == 1){ + rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); + } + pVg->numOfRows += rows; (*numOfRows) += rows; - + changeByteEndian(rawData); if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); if(schema){ @@ -1596,29 +1628,22 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, } } } +} +SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { + SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); + pRspObj->resType = RES_TYPE__TMQ; + memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj); return pRspObj; } SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); pRspObj->resType = RES_TYPE__TMQ_METADATA; - tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); - tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); - pRspObj->vgId = pWrapper->vgHandle->vgId; - pRspObj->resIter = -1; memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); - pRspObj->resInfo.totalRows = 0; - pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - - // extract the rows in this data packet - for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i); - int64_t rows = htobe64(pRetrieve->numOfRows); - pVg->numOfRows += rows; - (*numOfRows) += rows; - } + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj); return pRspObj; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index c6f004f83b..49f1f1eb0e 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2191,12 +2191,12 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha return TSDB_CODE_SUCCESS; } -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion) { +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t dataLen = 0; // todo extract method int32_t* version = (int32_t*)data; - *version = bVersion; + *version = BLOCK_VERSION_1; data += sizeof(int32_t); int32_t* actualLen = (int32_t*)data; @@ -2277,9 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, in data += colSizes[col]; } - if(bVersion == BLOCK_VERSION_1){ - colSizes[col] = htonl(colSizes[col]); - } + colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); } @@ -2340,9 +2338,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - if(version == BLOCK_VERSION_1){ - colLen[i] = htonl(colLen[i]); - } + colLen[i] = htonl(colLen[i]); ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index b36dc71322..14853009e0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -401,7 +401,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { pStart += sizeof(SSysTableSchema); } - int32_t len = blockEncode(pBlock, pStart, numOfCols, BLOCK_VERSION_1); + int32_t len = blockEncode(pBlock, pStart, numOfCols); pRsp->numOfRows = htonl(pBlock->info.rows); pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index e46c057129..8e7e72aa0e 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -328,7 +328,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { pStart += sizeof(SSysTableSchema); } - int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1); + int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); } pRsp->numOfRows = htonl(rowsRead); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ed1192c490..1ddd2f34e6 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3150,7 +3150,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t // pStart += sizeof(SSysTableSchema); // } // -// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1); +// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); // } // // pRsp->numOfRows = htonl(rowsRead); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 4e4e1481f9..5432637482 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -28,7 +28,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t pRetrieve->compressed = 0; pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_2); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); actualLen += sizeof(SRetrieveTableRspForTmq); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index e74c735497..6bae0e1022 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -40,7 +40,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(numOfCols); - int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols, BLOCK_VERSION_1); + int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index d2d14bb9f3..66b50bcb47 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1817,7 +1817,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { rsp->completed = 1; rsp->numOfRows = htobe64((int64_t)rowNum); - int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock), BLOCK_VERSION_1); + int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); rsp->compLen = htonl(len); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index fb57d6b531..abe566473f 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->dataLen = 0; pBuf->useSize = sizeof(SDataCacheEntry); - pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols, BLOCK_VERSION_1); + pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 080c2c9556..b51845d152 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -188,7 +188,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->version = htobe64(pBlock->info.version); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); SStreamRetrieveReq req = { .streamId = pTask->id.streamId, @@ -772,7 +772,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); actualLen += sizeof(SRetrieveTableRsp); ASSERT(actualLen <= dataStrLen); taosArrayPush(pReq->dataLen, &actualLen);