fix(query): prepare the buffer before serialize the retrieved data.

This commit is contained in:
Haojun Liao 2022-06-22 19:34:56 +08:00
parent aa4d863598
commit c79eda9f70
3 changed files with 16 additions and 61 deletions

View File

@ -302,50 +302,20 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
ASSERT(pBlockInfo->capacity >= numOfRows); ASSERT(pBlockInfo->capacity >= numOfRows);
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
// if (pColumnInfoData->capacity < numOfRows) {
// char* p = taosMemoryRealloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * numOfRows);
// if (p == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
//
// memset(p, 0, sizeof(int32_t));
// pColumnInfoData->capacity = numOfRows;
// pColumnInfoData->varmeta.offset = (int32_t*)p;
// }
memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows); memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows);
// if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) { if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) {
// char* tmp = taosMemoryRealloc(pColumnInfoData->pData, pSource->varmeta.length); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, pSource->varmeta.length);
// if (tmp == NULL) { if (tmp == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
// } }
//
// pColumnInfoData->pData = tmp; pColumnInfoData->pData = tmp;
// pColumnInfoData->varmeta.allocLen = pSource->varmeta.length; pColumnInfoData->varmeta.allocLen = pSource->varmeta.length;
// } }
memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length);
pColumnInfoData->varmeta.length = pSource->varmeta.length; pColumnInfoData->varmeta.length = pSource->varmeta.length;
memcpy(pColumnInfoData->pData, pSource->pData, pSource->varmeta.length);
} else { } else {
// if (pColumnInfoData->capacity < numOfRows) {
// char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(numOfRows));
// if (tmp == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
//
// memset(tmp, 0, BitmapLen(numOfRows));
// pColumnInfoData->nullbitmap = tmp;
//
// int32_t newSize = numOfRows * pColumnInfoData->info.bytes;
// tmp = taosMemoryRealloc(pColumnInfoData->pData, newSize);
// if (tmp == NULL) {
// return TSDB_CODE_OUT_OF_MEMORY;
// }
//
// pColumnInfoData->pData = tmp;
// pColumnInfoData->capacity = numOfRows;
// }
memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows)); memcpy(pColumnInfoData->nullbitmap, pSource->nullbitmap, BitmapLen(numOfRows));
memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows); memcpy(pColumnInfoData->pData, pSource->pData, pSource->info.bytes * numOfRows);
} }
@ -1874,8 +1844,6 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen
} }
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) { const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
blockDataEnsureCapacity(pBlock, numOfRows);
const char* pStart = pData; const char* pStart = pData;
int32_t dataLen = *(int32_t*)pStart; int32_t dataLen = *(int32_t*)pStart;
@ -1884,11 +1852,6 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
pBlock->info.groupId = *(uint64_t*)pStart; pBlock->info.groupId = *(uint64_t*)pStart;
pStart += sizeof(uint64_t); pStart += sizeof(uint64_t);
if (pBlock->pDataBlock == NULL) {
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArraySetSize(pBlock->pDataBlock, numOfCols);
}
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
pColInfoData->info.type = *(int16_t*)pStart; pColInfoData->info.type = *(int16_t*)pStart;
@ -1919,22 +1882,16 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows); memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
pStart += sizeof(int32_t) * numOfRows; pStart += sizeof(int32_t) * numOfRows;
if (colLen[i] > 0) { if (colLen[i] > 0 && pColInfoData->pData == NULL) {
taosMemoryFreeClear(pColInfoData->pData); taosMemoryFreeClear(pColInfoData->pData);
pColInfoData->pData = taosMemoryMalloc(colLen[i]); pColInfoData->pData = taosMemoryMalloc(colLen[i]);
} }
} else { } else {
if (pColInfoData->nullbitmap == NULL) {
pColInfoData->nullbitmap = taosMemoryCalloc(1, BitmapLen(numOfRows));
}
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows)); memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
pStart += BitmapLen(numOfRows); pStart += BitmapLen(numOfRows);
} }
if (colLen[i] > 0) { if (colLen[i] > 0) {
if (pColInfoData->pData == NULL) {
pColInfoData->pData = taosMemoryCalloc(1, colLen[i]);
}
memcpy(pColInfoData->pData, pStart, colLen[i]); memcpy(pColInfoData->pData, pStart, colLen[i]);
} }

View File

@ -2075,14 +2075,13 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo
blockCompressDecode(pBlock, numOfCols, numOfRows, pStart); blockCompressDecode(pBlock, numOfCols, numOfRows, pStart);
blockDataEnsureCapacity(pRes, numOfRows); blockDataEnsureCapacity(pRes, numOfRows);
relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
// data from mnode // data from mnode
pRes->info.rows = numOfRows; pRes->info.rows = numOfRows;
relocateColumnData(pRes, pColList, pBlock->pDataBlock, false);
taosArrayDestroy(pBlock->pDataBlock); // taosArrayDestroy(pBlock->pDataBlock);
taosMemoryFree(pBlock); // taosMemoryFree(pBlock);
// blockDataDestroy(pBlock); blockDataDestroy(pBlock);
} }
// todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator. // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.

View File

@ -1366,7 +1366,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
} }
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
int32_t numOfRows = 0; int32_t numOfRows = 0;
const char* db = NULL; const char* db = NULL;
@ -1558,7 +1557,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
if (pRsp->numOfRows == 0 || pRsp->completed) { if (pRsp->numOfRows == 0 || pRsp->completed) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64 " ", GET_TASKID(pTaskInfo), qDebug("%s load meta data from mnode completed, rowsOfSource:%d, totalRows:%" PRIu64, GET_TASKID(pTaskInfo),
pRsp->numOfRows, pInfo->loadInfo.totalRows); pRsp->numOfRows, pInfo->loadInfo.totalRows);
if (pRsp->numOfRows == 0) { if (pRsp->numOfRows == 0) {