fix(query): reserve correct buffer size before retrieve datablock from vnode.
This commit is contained in:
parent
bbe1deaf00
commit
1568722e8e
|
@ -263,6 +263,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, in
|
|||
pColumnInfoData->varmeta.length = len + oldLen;
|
||||
} else {
|
||||
if (finalNumOfRows > *capacity) {
|
||||
ASSERT(finalNumOfRows*pColumnInfoData->info.bytes);
|
||||
char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
|
@ -1126,6 +1127,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
|
|||
}
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, size_t existRows, uint32_t numOfRows) {
|
||||
ASSERT(numOfRows);
|
||||
if (0 == numOfRows || numOfRows <= existRows) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1178,6 +1180,8 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
|||
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
||||
int32_t code = 0;
|
||||
ASSERT(numOfRows > 0);
|
||||
|
||||
if (numOfRows == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -234,6 +234,7 @@ typedef struct SColMatchInfo {
|
|||
int32_t colId;
|
||||
int32_t targetSlotId;
|
||||
bool output;
|
||||
bool reserved;
|
||||
int32_t matchType; // determinate the source according to col id or slot id
|
||||
} SColMatchInfo;
|
||||
|
||||
|
@ -253,7 +254,6 @@ typedef struct STableScanInfo {
|
|||
|
||||
SFileBlockLoadRecorder readRecorder;
|
||||
int64_t numOfRows;
|
||||
// int32_t prevGroupId; // previous table group id
|
||||
SScanInfo scanInfo;
|
||||
int32_t scanTimes;
|
||||
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||
|
|
|
@ -609,7 +609,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
}
|
||||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity);
|
||||
ASSERT(pResult->info.capacity > 0);
|
||||
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
|
||||
|
||||
numOfRows = dest.numOfRows;
|
||||
|
@ -649,7 +649,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
}
|
||||
|
||||
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
|
||||
colInfoDataEnsureCapacity(pResColData, startOffset, pResult->info.capacity);
|
||||
ASSERT(pResult->info.capacity > 0);
|
||||
colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows);
|
||||
|
||||
numOfRows = dest.numOfRows;
|
||||
|
@ -1340,9 +1340,9 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
|||
}
|
||||
|
||||
if (rowRes != NULL) {
|
||||
int32_t totalRows = pBlock->info.rows;
|
||||
SSDataBlock* px = createOneDataBlock(pBlock, true);
|
||||
|
||||
int32_t totalRows = pBlock->info.rows;
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
|
||||
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
|
|
@ -224,7 +224,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
|
||||
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
|
||||
if (!pColMatchInfo->output) {
|
||||
continue;
|
||||
|
@ -384,7 +384,14 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
continue;
|
||||
}
|
||||
|
||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
||||
blockDataCleanup(pBlock);
|
||||
|
||||
SDataBlockInfo binfo = pBlock->info;
|
||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo);
|
||||
|
||||
binfo.capacity = binfo.rows;
|
||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
||||
pBlock->info = binfo;
|
||||
|
||||
uint32_t status = 0;
|
||||
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||
|
@ -530,7 +537,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
// taosSsleep(20);
|
||||
|
||||
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
|
|
Loading…
Reference in New Issue