fix(query): set correct sort buffer page size.
This commit is contained in:
parent
6b2d4a6aca
commit
63884b554b
|
@ -1226,6 +1226,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
colInfo.info = p->info;
|
colInfo.info = p->info;
|
||||||
|
colInfo.hasNull = true;
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -151,6 +151,13 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
|
||||||
*/
|
*/
|
||||||
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
|
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get proper sort buffer pages according to the row size
|
||||||
|
* @param rowSize
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t getProperSortPageSize(size_t rowSize);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -420,10 +420,10 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 1024);
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
|
||||||
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||||
pInfo->pInputBlock = pInputBlock;
|
pInfo->pInputBlock = pInputBlock;
|
||||||
|
@ -432,12 +432,17 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
|
||||||
pInfo->hasGroupId = false;
|
pInfo->hasGroupId = false;
|
||||||
pInfo->prefetchedTuple = NULL;
|
pInfo->prefetchedTuple = NULL;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||||
|
|
||||||
|
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
|
||||||
|
numOfSources = MAX(2, numOfSources);
|
||||||
|
|
||||||
|
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||||
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
|
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
|
||||||
|
|
|
@ -532,6 +532,19 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getProperSortPageSize(size_t rowSize) {
|
||||||
|
uint32_t defaultPageSize = 4096;
|
||||||
|
|
||||||
|
uint32_t pgSize = 0;
|
||||||
|
if (rowSize * 4 > defaultPageSize) {
|
||||||
|
pgSize = rowSize * 4;
|
||||||
|
} else {
|
||||||
|
pgSize = defaultPageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pgSize;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t createInitialSources(SSortHandle* pHandle) {
|
static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
|
||||||
|
@ -557,14 +570,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
|
|
||||||
if (!hasGroupId) {
|
if (!hasGroupId) {
|
||||||
// calculate the buffer pages according to the total available buffers.
|
// calculate the buffer pages according to the total available buffers.
|
||||||
int32_t rowSize = blockDataGetRowSize(pBlock);
|
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
|
||||||
if (rowSize * 4 > 4096) {
|
|
||||||
pHandle->pageSize = rowSize * 4;
|
|
||||||
} else {
|
|
||||||
pHandle->pageSize = 4096;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo!!
|
// todo, number of pages are set according to the total available sort buffer
|
||||||
pHandle->numOfPages = 1024;
|
pHandle->numOfPages = 1024;
|
||||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
|
||||||
|
@ -577,7 +585,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
if (pHandle->beforeFp != NULL) {
|
if (pHandle->beforeFp != NULL) {
|
||||||
pHandle->beforeFp(pBlock, pHandle->param);
|
pHandle->beforeFp(pBlock, pHandle->param);
|
||||||
}
|
}
|
||||||
// todo relocate the columns
|
|
||||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue