Merge pull request #21766 from taosdata/szhou/2/fix-ts3543
fix: oom issue since table merge scan
This commit is contained in:
commit
c9d3e697c4
|
@ -2728,7 +2728,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
pAPI->tsdReader.tsdReaderClose(source->dataReader);
|
pAPI->tsdReader.tsdReaderClose(source->dataReader);
|
||||||
source->dataReader = NULL;
|
source->dataReader = NULL;
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
|
blockDataDestroy(source->inputBlock);
|
||||||
|
source->inputBlock = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2785,7 +2786,18 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
||||||
// the additional one is reserved for merge result
|
// the additional one is reserved for merge result
|
||||||
// pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
// pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * (256 + 1);
|
int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize);
|
||||||
|
if (kWay >= 128) {
|
||||||
|
kWay = 128;
|
||||||
|
} else if (kWay <= 2) {
|
||||||
|
kWay = 2;
|
||||||
|
} else {
|
||||||
|
int i = 2;
|
||||||
|
while (i * 2 <= kWay) i = i * 2;
|
||||||
|
kWay = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
||||||
|
@ -2801,7 +2813,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
param.readerIdx = i;
|
param.readerIdx = i;
|
||||||
param.pOperator = pOperator;
|
param.pOperator = pOperator;
|
||||||
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||||
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
|
|
||||||
|
|
||||||
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
taosArrayPush(pInfo->sortSourceParams, ¶m);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue