diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index bcb40f4175..e305f4c2ec 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo { SColumnInfoData* pColData; } SBlockOrderInfo; +#define BLOCK_VERSION_1 1 +#define BLOCK_VERSION_2 2 + #define NBIT (3u) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) @@ -253,7 +256,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); int32_t blockGetEncodeSize(const SSDataBlock* pBlock); -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion); const char* blockDecode(SSDataBlock* pBlock, const char* pData); // for debug diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9800d233e9..a8ebef763a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1811,7 +1811,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i char* pStart = p + len; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { int32_t* offset = (int32_t*)pStart; @@ -1910,8 +1910,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; - int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; + int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i]; + int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i]; if (ASSERT(colLen < dataLen)) { tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1970,7 +1970,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; - colLength1[i] = (blockVersion == 1) ? htonl(len) : len; + colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len; } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -2059,7 +2059,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - if(blockVersion == 1){ + if(blockVersion == BLOCK_VERSION_1){ colLength[i] = htonl(colLength[i]); } if (colLength[i] >= dataLen) { diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index e0cedb9924..3aa46d135e 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -499,7 +499,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) { (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); - int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); + int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS, BLOCK_VERSION_1); blockDataDestroy(pBlock); if (len != rspSize - sizeof(SRetrieveTableRsp)) { @@ -611,7 +611,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); - int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS); + int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS, BLOCK_VERSION_1); blockDataDestroy(pBlock); if (len != rspSize - sizeof(SRetrieveTableRsp)) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5382259899..c6f004f83b 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2191,12 +2191,12 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha return TSDB_CODE_SUCCESS; } -int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { +int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion) { int32_t dataLen = 0; // todo extract method int32_t* version = (int32_t*)data; - *version = 2; + *version = bVersion; data += sizeof(int32_t); int32_t* actualLen = (int32_t*)data; @@ -2277,7 +2277,9 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; } -// colSizes[col] = htonl(colSizes[col]); + if(bVersion == BLOCK_VERSION_1){ + colSizes[col] = htonl(colSizes[col]); + } // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // htonl(colSizes[col]), colSizes[col]); } @@ -2338,7 +2340,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - if(version == 1){ + if(version == BLOCK_VERSION_1){ colLen[i] = htonl(colLen[i]); } ASSERT(colLen[i] >= 0); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 14853009e0..b36dc71322 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -401,7 +401,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { pStart += sizeof(SSysTableSchema); } - int32_t len = blockEncode(pBlock, pStart, numOfCols); + int32_t len = blockEncode(pBlock, pStart, numOfCols, BLOCK_VERSION_1); pRsp->numOfRows = htonl(pBlock->info.rows); pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 8e7e72aa0e..e46c057129 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -328,7 +328,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { pStart += sizeof(SSysTableSchema); } - int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); + int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1); } pRsp->numOfRows = htonl(rowsRead); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1ddd2f34e6..ed1192c490 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -3150,7 +3150,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t // pStart += sizeof(SSysTableSchema); // } // -// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); +// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1); // } // // pRsp->numOfRows = htonl(rowsRead); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 5432637482..4e4e1481f9 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -28,7 +28,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t pRetrieve->compressed = 0; pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_2); actualLen += sizeof(SRetrieveTableRspForTmq); taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockData, &buf); diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 6bae0e1022..e74c735497 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -40,7 +40,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfCols = htonl(numOfCols); - int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); + int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols, BLOCK_VERSION_1); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 66b50bcb47..d2d14bb9f3 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -1817,7 +1817,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { rsp->completed = 1; rsp->numOfRows = htobe64((int64_t)rowNum); - int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); + int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock), BLOCK_VERSION_1); rsp->compLen = htonl(len); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index abe566473f..fb57d6b531 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pEntry->dataLen = 0; pBuf->useSize = sizeof(SDataCacheEntry); - pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); + pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols, BLOCK_VERSION_1); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 6b655bfae6..514f2ca64e 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -718,7 +718,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[j]); } else { pStart += colLength[j]; @@ -749,7 +749,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate goto end; } fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength && version == 1) { + if (needChangeLength && version == BLOCK_VERSION_1) { pStart += htonl(colLength[i]); } else { pStart += colLength[i]; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b51845d152..080c2c9556 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -188,7 +188,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->version = htobe64(pBlock->info.version); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1); SStreamRetrieveReq req = { .streamId = pTask->id.streamId, @@ -772,7 +772,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols); - int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); + int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1); actualLen += sizeof(SRetrieveTableRsp); ASSERT(actualLen <= dataStrLen); taosArrayPush(pReq->dataLen, &actualLen);