enh: blockencode

This commit is contained in:
xsren 2024-10-20 22:17:16 +08:00
parent d01a3481bc
commit f35d2847b9
7 changed files with 28 additions and 28 deletions

View File

@ -588,8 +588,8 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
return code; return code;
} }
size_t dataEncodeSize = blockGetEncodeSize(pBlock); size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize); *pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) { if (NULL == *pRsp) {
code = terrno; code = terrno;
@ -604,7 +604,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, SHOW_VARIABLES_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
if(len < 0) { if(len < 0) {
uError("buildShowVariablesRsp error, len:%d", len); uError("buildShowVariablesRsp error, len:%d", len);
code = terrno; code = terrno;
@ -742,8 +742,8 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
return code; return code;
} }
size_t dataEncodeSize = blockGetEncodeSize(pBlock); size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize); *pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) { if (NULL == *pRsp) {
code = terrno; code = terrno;
@ -759,7 +759,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, COMPACT_DB_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
if(len < 0) { if(len < 0) {
uError("buildRetriveTableRspForCompactDb error, len:%d", len); uError("buildRetriveTableRspForCompactDb error, len:%d", len);
code = terrno; code = terrno;

View File

@ -548,8 +548,8 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
size_t dataEncodeSize = blockGetEncodeSize(pBlock); size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeSize; size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * numOfCols + dataEncodeBufSize;
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size); SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) { if (pRsp == NULL) {
@ -574,7 +574,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = blockEncode(pBlock, pStart, dataEncodeSize, numOfCols); int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, numOfCols);
if (len < 0) { if (len < 0) {
dError("failed to retrieve data since %s", tstrerror(code)); dError("failed to retrieve data since %s", tstrerror(code));
blockDataDestroy(pBlock); blockDataDestroy(pBlock);

View File

@ -333,8 +333,8 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows); mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
} }
size_t dataEncodeSize = blockGetEncodeSize(pBlock); size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns + dataEncodeSize; size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns + dataEncodeBufSize;
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size); SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) { if (pRsp == NULL) {
@ -361,7 +361,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = blockEncode(pBlock, pStart, dataEncodeSize, pShow->pMeta->numOfColumns); int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, pShow->pMeta->numOfColumns);
if(len < 0){ if(len < 0){
mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code)); mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
code = terrno; code = terrno;

View File

@ -16,8 +16,8 @@
#include "tq.h" #include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
size_t dataEncodeSize = blockGetEncodeSize(pBlock); size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeSize; int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
void* buf = taosMemoryCalloc(1, dataStrLen); void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) { if (buf == NULL) {
return terrno; return terrno;
@ -29,7 +29,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeSize, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
if(actualLen < 0){ if(actualLen < 0){
taosMemoryFree(buf); taosMemoryFree(buf);
return terrno; return terrno;

View File

@ -35,8 +35,8 @@
extern SConfig* tsCfg; extern SConfig* tsCfg;
static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) { static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRetrieveTableRsp** pRsp) {
size_t dataEncodeSize = blockGetEncodeSize(pBlock); size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
*pRsp = taosMemoryCalloc(1, rspSize); *pRsp = taosMemoryCalloc(1, rspSize);
if (NULL == *pRsp) { if (NULL == *pRsp) {
return terrno; return terrno;
@ -50,7 +50,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols); (*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, numOfCols); int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, numOfCols);
if(len < 0) { if(len < 0) {
taosMemoryFree(*pRsp); taosMemoryFree(*pRsp);
return terrno; return terrno;

View File

@ -1966,8 +1966,8 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
pBlock->info.rows = rowNum; pBlock->info.rows = rowNum;
size_t dataEncodeSize = blockGetEncodeSize(pBlock); size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeSize + PAYLOAD_PREFIX_LEN; int32_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) { if (NULL == rsp) {
@ -1978,7 +1978,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htobe64((int64_t)rowNum); rsp->numOfRows = htobe64((int64_t)rowNum);
int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, dataEncodeSize, taosArrayGetSize(pBlock->pDataBlock)); int32_t len = blockEncode(pBlock, rsp->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, taosArrayGetSize(pBlock->pDataBlock));
if(len < 0) { if(len < 0) {
qError("qExplainGetRspFromCtx: blockEncode failed"); qError("qExplainGetRspFromCtx: blockEncode failed");
QRY_ERR_JRET(terrno); QRY_ERR_JRET(terrno);

View File

@ -84,18 +84,18 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
{ {
size_t dataEncodeSize = pBuf->allocSize + 8; size_t dataEncodeBufSize = pBuf->allocSize + 8;
if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) { if ((pBuf->allocSize > tsCompressMsgSize) && (tsCompressMsgSize > 0) && pHandle->pManager->cfg.compress) {
if (pHandle->pCompressBuf == NULL) { if (pHandle->pCompressBuf == NULL) {
// allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size // allocate additional 8 bytes to avoid invalid write if compress failed to reduce the size
pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeSize); pHandle->pCompressBuf = taosMemoryMalloc(dataEncodeBufSize);
if (NULL == pHandle->pCompressBuf) { if (NULL == pHandle->pCompressBuf) {
QRY_RET(terrno); QRY_RET(terrno);
} }
pHandle->bufSize = dataEncodeSize; pHandle->bufSize = dataEncodeBufSize;
} else { } else {
if (pHandle->bufSize < dataEncodeSize) { if (pHandle->bufSize < dataEncodeBufSize) {
pHandle->bufSize = dataEncodeSize; pHandle->bufSize = dataEncodeBufSize;
void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize); void* p = taosMemoryRealloc(pHandle->pCompressBuf, pHandle->bufSize);
if (p != NULL) { if (p != NULL) {
pHandle->pCompressBuf = p; pHandle->pCompressBuf = p;
@ -106,7 +106,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
} }
} }
int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeSize, numOfCols); int32_t dataLen = blockEncode(pInput->pData, pHandle->pCompressBuf, dataEncodeBufSize, numOfCols);
if(dataLen < 0) { if(dataLen < 0) {
qError("failed to encode data block, code: %d", dataLen); qError("failed to encode data block, code: %d", dataLen);
return terrno; return terrno;
@ -124,7 +124,7 @@ static int32_t toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData*
TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen); TAOS_MEMCPY(pEntry->data, pHandle->pCompressBuf, dataLen);
} }
} else { } else {
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, dataEncodeSize, numOfCols); pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, pBuf->allocSize, numOfCols);
if(pEntry->dataLen < 0) { if(pEntry->dataLen < 0) {
qError("failed to encode data block, code: %d", pEntry->dataLen); qError("failed to encode data block, code: %d", pEntry->dataLen);
return terrno; return terrno;