From 0a567252f5cae6de2100960dad2b113e1aca2ba0 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 17 Jun 2023 18:50:33 +0800 Subject: [PATCH 1/4] fix: oom issue since table merge scan --- source/libs/executor/src/scanoperator.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 90dfbd5599..d53228590c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2785,7 +2785,14 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result // pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); - pInfo->sortBufSize = pInfo->bufPageSize * (256 + 1); + int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize); + if (kWay >= 256) { + kWay = 256; + } else if (kWay <= 2) { + kWay = 2; + } + + pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); @@ -2801,7 +2808,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { param.readerIdx = i; param.pOperator = pOperator; param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); - blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity); taosArrayPush(pInfo->sortSourceParams, ¶m); From 577c131360d54de48ea70de10c9e53601a911f61 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 17 Jun 2023 22:13:08 +0800 Subject: [PATCH 2/4] fix: search k-way number --- source/libs/executor/src/scanoperator.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d53228590c..15fc164cf1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2790,6 +2790,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { kWay = 256; } else if (kWay <= 2) { kWay = 2; + } else { + int i = 2; + while (i * 2 <= kWay) i = i * 2; + kWay = i; } pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1); From 9e781ed7a66f372ce0d7f8c53547f4269dd2600b Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 18 Jun 2023 06:50:53 +0800 Subject: [PATCH 3/4] fix: destory data block to fix oom --- source/libs/executor/src/scanoperator.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 15fc164cf1..42c0b88089 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2728,7 +2728,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pAPI->tsdReader.tsdReaderClose(source->dataReader); source->dataReader = NULL; pInfo->base.dataReader = NULL; - + blockDataDestroy(source->inputBlock); + source->inputBlock = NULL; return NULL; } From fc523d1265a212c5e1eafb807c779093e00f53a1 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 18 Jun 2023 07:53:35 +0800 Subject: [PATCH 4/4] fix: at most 128 k-way merge to decrease memory usage --- source/libs/executor/src/scanoperator.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 42c0b88089..83aba6729b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2787,8 +2787,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { // the additional one is reserved for merge result // pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize); - if (kWay >= 256) { - kWay = 256; + if (kWay >= 128) { + kWay = 128; } else if (kWay <= 2) { kWay = 2; } else {