fix(query): set correct buffer page size.

This commit is contained in:
Haojun Liao 2022-06-17 09:18:53 +08:00
parent cc638a2236
commit feebe90a1f
1 changed files with 21 additions and 29 deletions

View File

@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
blockDataEnsureCapacity(p, capacity); blockDataEnsureCapacity(p, capacity);
while (1) { while (1) {
STupleHandle* pTupleHandle = NULL; STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
}
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
appendOneRowToDataBlock(p, pTupleHandle); appendOneRowToDataBlock(p, pTupleHandle);
if (p->info.rows >= capacity) { if (p->info.rows >= capacity) {
break; break;
} }
} }
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
return (p->info.rows > 0) ? p : NULL; return (p->info.rows > 0) ? p : NULL;
} }
@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
goto _error; goto _error;
} }
@ -2284,17 +2275,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; pInfo->dataReaders = dataReaders;
pInfo->dataReaders = dataReaders; pInfo->scanFlag = MAIN_SCAN;
pInfo->scanFlag = MAIN_SCAN; pInfo->pColMatchInfo = pColList;
pInfo->pColMatchInfo = pColList; pInfo->curTWinIdx = 0;
pInfo->curTWinIdx = 0;
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
@ -2307,22 +2297,24 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
taosArrayPush(pInfo->sortSourceParams, param); taosArrayPush(pInfo->sortSourceParams, param);
taosMemoryFree(param); taosMemoryFree(param);
} }
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; pInfo->bufPageSize = getProperSortPageSize(rowSize);
pInfo->sortBufSize = pInfo->bufPageSize * 16; pInfo->sortBufSize = pInfo->bufPageSize * 16;
pInfo->hasGroupId = false; pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL; pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator"; pOperator->name = "TableMergeScanOperator";
// TODO : change it // TODO : change it
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pOperator->fpSet = pOperator->fpSet =