From 8b6d7db7adc727f69d380a7eae108ba82c7fcb9a Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 16 Jul 2023 17:54:39 +0800 Subject: [PATCH 01/14] enhance: use one tsdb reader to read table sequentially --- source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/inc/tsort.h | 1 + source/libs/executor/src/scanoperator.c | 84 +++++-------------------- source/libs/executor/src/tsort.c | 84 +++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 69 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index b3d0ff8225..3b3f2cbb8a 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -232,7 +232,6 @@ typedef struct STableMergeScanInfo { int32_t tableEndIndex; bool hasGroupId; uint64_t groupId; - SArray* queryConds; // array of queryTableDataCond STableScanBase base; int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort @@ -245,6 +244,7 @@ typedef struct STableMergeScanInfo { int64_t numOfRows; SScanInfo scanInfo; int32_t scanTimes; + int32_t readIdx; SSDataBlock* pResBlock; SSampleExecInfo sample; // sample execution info SSortExecInfo sortExecInfo; diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 627aa825c6..a2cb1234ff 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -26,6 +26,7 @@ extern "C" { enum { SORT_MULTISOURCE_MERGE = 0x1, SORT_SINGLESOURCE_SORT = 0x2, + SORT_TABLE_MERGE_SCAN = 0x3 }; typedef struct SMultiMergeSource { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9abe4ffef6..cf1e93d3f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -// clang-format off - #include "executorInt.h" #include "filter.h" #include "function.h" @@ -56,7 +54,6 @@ typedef struct STableMergeScanSortSourceParam { int32_t readerIdx; uint64_t uid; SSDataBlock* inputBlock; - STsdbReader* dataReader; } STableMergeScanSortSourceParam; typedef struct STableCountScanOperatorInfo { @@ -2741,28 +2738,28 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SSDataBlock* pBlock = source->inputBlock; int32_t code = 0; - SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); - int64_t st = taosGetTimestampUs(); void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; - if (NULL == source->dataReader) { - code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL); + + bool hasNext = false; + + if (NULL == pInfo->base.dataReader) { + code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, p, 1, pBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); if (code != 0) { T_LONG_JMP(pTaskInfo->env, code); } + pInfo->readIdx = readIdx + pInfo->tableStartIndex ; + } else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) { + pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1); + pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); } - pInfo->base.dataReader = source->dataReader; STsdbReader* reader = pInfo->base.dataReader; - bool hasNext = false; - qTrace("tsdb/read-table-data: %p, enter next reader", reader); - while (true) { code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); if (code != 0) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, code); } @@ -2772,7 +2769,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { if (isTaskKilled(pTaskInfo)) { pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); - pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } @@ -2782,12 +2778,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - if (pQueryCond->order == TSDB_ORDER_ASC) { - pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; - } else { - pQueryCond->twindows.ekey = pBlock->info.window.skey - 1; - } - uint32_t status = 0; code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); @@ -2809,14 +2799,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - qTrace("tsdb/read-table-data: %p, close reader", reader); - pInfo->base.dataReader = NULL; return pBlock; } - pAPI->tsdReader.tsdReaderClose(source->dataReader); - source->dataReader = NULL; - pInfo->base.dataReader = NULL; blockDataDestroy(source->inputBlock); source->inputBlock = NULL; return NULL; @@ -2870,32 +2855,18 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->base.dataReader = NULL; - // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result - // pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); - int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize); - if (kWay >= 128) { - kWay = 128; - } else if (kWay <= 2) { - kWay = 2; - } else { - int i = 2; - while (i * 2 <= kWay) i = i * 2; - kWay = i; - } + pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); - pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); // one table has one data block int32_t numOfTable = tableEndIdx - tableStartIdx + 1; - pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond)); for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam param = {0}; @@ -2904,10 +2875,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); taosArrayPush(pInfo->sortSourceParams, ¶m); - - SQueryTableDataCond cond; - dumpQueryTableCond(&pInfo->base.cond, &cond); - taosArrayPush(pInfo->queryConds, &cond); } for (int32_t i = 0; i < numOfTable; ++i) { @@ -2932,8 +2899,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t numOfTable = taosArrayGetSize(pInfo->queryConds); - SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; @@ -2941,24 +2906,15 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - for (int32_t i = 0; i < numOfTable; ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->sortSourceParams); ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); - pAPI->tsdReader.tsdReaderClose(param->dataReader); - param->dataReader = NULL; } taosArrayClear(pInfo->sortSourceParams); tsortDestroySortHandle(pInfo->pSortHandle); pInfo->pSortHandle = NULL; - for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) { - SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i); - taosMemoryFree(cond->colList); - } - taosArrayDestroy(pInfo->queryConds); - pInfo->queryConds = NULL; - resetLimitInfoForNextGroup(&pInfo->limitInfo); return TSDB_CODE_SUCCESS; } @@ -3056,13 +3012,11 @@ void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->base.cond); - int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); + int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams); for (int32_t i = 0; i < numOfTable; i++) { STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); blockDataDestroy(p->inputBlock); - pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader); - p->dataReader = NULL; } pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); @@ -3072,12 +3026,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { - SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); - taosMemoryFree(pCond->colList); - } - - taosArrayDestroy(pTableScanInfo->queryConds); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); @@ -3143,6 +3091,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->base.scanFlag = MAIN_SCAN; pInfo->base.readHandle = *readHandle; + pInfo->readIdx = -1; + pInfo->base.limitInfo.limit.limit = -1; pInfo->base.limitInfo.slimit.limit = -1; pInfo->base.pTableListInfo = pTableListInfo; @@ -3573,6 +3523,4 @@ static void destoryTableCountScanOperator(void* param) { taosArrayDestroy(pTableCountScanInfo->stbUidList); taosMemoryFreeClear(param); -} - -// clang-format on +} \ No newline at end of file diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index d26db6536f..c8266ea0d9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -769,6 +769,61 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) { return pgSize; } +static int32_t createPageBuf(SSortHandle* pHandle) { + if (pHandle->pBuf == NULL) { + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_DISKSPACE; + qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir); + return terrno; + } + + int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, + "tableBlocksBuf", tsTempDir); + dBufSetPrintInfo(pHandle->pBuf); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + return 0; +} + +static int32_t addDataBlockToPageBuf(SSortHandle * pHandle, SSDataBlock* pDataBlock, SArray* aPgId) { + int32_t start = 0; + while (start < pDataBlock->info.rows) { + int32_t stop = 0; + blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); + SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); + if (p == NULL) { + taosArrayDestroy(aPgId); + return terrno; + } + + int32_t pageId = -1; + void* pPage = getNewBufPage(pHandle->pBuf, &pageId); + if (pPage == NULL) { + taosArrayDestroy(aPgId); + blockDataDestroy(p); + return terrno; + } + + taosArrayPush(aPgId, &pageId); + + int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); + ASSERT(size <= getBufPageSize(pHandle->pBuf)); + + blockDataToBuf(pPage, p); + + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + + blockDataDestroy(p); + start = stop + 1; + } + + blockDataCleanup(pDataBlock); + return 0; +} + static int32_t createInitialSources(SSortHandle* pHandle) { size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; int32_t code = 0; @@ -875,6 +930,35 @@ static int32_t createInitialSources(SSortHandle* pHandle) { code = doAddToBuf(pHandle->pDataBlock, pHandle); } } + } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + + // pHandle->numOfPages = 1024; //todo check sortbufsize + createPageBuf(pHandle); + + for (int i = 0; i < nSrc; ++i) { + SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); + + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + while (pBlk != NULL) { + addDataBlockToPageBuf(pHandle, pBlk, aPgId); + pBlk = pHandle->fetchfp(pSrc->param); + } + SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); + code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pBlock, &pHandle->sourceId, aPgId); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(aExtSrc); + return code; + } + } + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + taosArrayDestroy(aExtSrc); + + pHandle->type = SORT_SINGLESOURCE_SORT; + } return code; From f93af4d2e0f9d39d45ad196e526575c3879d29d0 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 16 Jul 2023 20:28:54 +0800 Subject: [PATCH 02/14] enhance: pass simple test --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/scanoperator.c | 19 +++++-------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 3b3f2cbb8a..6aa93a9c5e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -238,6 +238,7 @@ typedef struct STableMergeScanInfo { SArray* pSortInfo; SSortHandle* pSortHandle; SSDataBlock* pSortInputBlock; + SSDataBlock* pReaderBlock; int64_t startTs; // sort start time SArray* sortSourceParams; SLimitInfo limitInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cf1e93d3f2..93addef91d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -53,7 +53,6 @@ typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; uint64_t uid; - SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; typedef struct STableCountScanOperatorInfo { @@ -2735,7 +2734,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t readIdx = source->readerIdx; - SSDataBlock* pBlock = source->inputBlock; + SSDataBlock* pBlock = pInfo->pReaderBlock; int32_t code = 0; int64_t st = taosGetTimestampUs(); @@ -2753,6 +2752,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { } else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) { pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); + pInfo->readIdx = readIdx + pInfo->tableStartIndex ; } STsdbReader* reader = pInfo->base.dataReader; @@ -2802,8 +2802,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { return pBlock; } - blockDataDestroy(source->inputBlock); - source->inputBlock = NULL; return NULL; } @@ -2872,7 +2870,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanSortSourceParam param = {0}; param.readerIdx = i; param.pOperator = pOperator; - param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); taosArrayPush(pInfo->sortSourceParams, ¶m); } @@ -2906,10 +2903,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - for (int32_t i = 0; i < taosArrayGetSize(pInfo->sortSourceParams); ++i) { - STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); - blockDataDestroy(param->inputBlock); - } taosArrayClear(pInfo->sortSourceParams); tsortDestroySortHandle(pInfo->pSortHandle); @@ -3014,11 +3007,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams); - for (int32_t i = 0; i < numOfTable; i++) { - STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); - blockDataDestroy(p->inputBlock); - } - pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.dataReader = NULL; @@ -3030,6 +3018,7 @@ void destroyTableMergeScanOperatorInfo(void* param) { pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); + pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock); taosArrayDestroy(pTableScanInfo->pSortInfo); taosMemoryFreeClear(param); @@ -3115,6 +3104,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); + pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); + int32_t rowSize = pInfo->pResBlock->info.rowSize; uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); From 9a0e9df5678774ec586e6c7a5e367a331f55aad0 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 16 Jul 2023 23:39:41 +0800 Subject: [PATCH 03/14] enhance: block is ts sorted and each book is a source --- source/libs/executor/src/scanoperator.c | 46 ++++++++----------------- source/libs/executor/src/tsort.c | 22 ++++++------ 2 files changed, 25 insertions(+), 43 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 93addef91d..7babf68e56 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -53,6 +53,7 @@ typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; uint64_t uid; + STsdbReader* reader; } STableMergeScanSortSourceParam; typedef struct STableCountScanOperatorInfo { @@ -2733,28 +2734,12 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - int32_t readIdx = source->readerIdx; SSDataBlock* pBlock = pInfo->pReaderBlock; int32_t code = 0; int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex); - SReadHandle* pHandle = &pInfo->base.readHandle; - bool hasNext = false; - if (NULL == pInfo->base.dataReader) { - code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, p, 1, pBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); - if (code != 0) { - T_LONG_JMP(pTaskInfo->env, code); - } - pInfo->readIdx = readIdx + pInfo->tableStartIndex ; - } else if (pInfo->readIdx != readIdx + pInfo->tableStartIndex) { - pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, p, 1); - pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); - pInfo->readIdx = readIdx + pInfo->tableStartIndex ; - } - STsdbReader* reader = pInfo->base.dataReader; while (true) { code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); @@ -2837,6 +2822,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SReadHandle* pHandle = &pInfo->base.readHandle; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; { size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); @@ -2866,21 +2853,15 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { // one table has one data block int32_t numOfTable = tableEndIdx - tableStartIdx + 1; - for (int32_t i = 0; i < numOfTable; ++i) { - STableMergeScanSortSourceParam param = {0}; - param.readerIdx = i; - param.pOperator = pOperator; + STableMergeScanSortSourceParam param = {0}; + param.pOperator = pOperator; + STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL); - taosArrayPush(pInfo->sortSourceParams, ¶m); - } - - for (int32_t i = 0; i < numOfTable; ++i) { - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); - ps->param = param; - ps->onlyRef = true; - tsortAddSource(pInfo->pSortHandle, ps); - } + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = ¶m; + ps->onlyRef = true; + tsortAddSource(pInfo->pSortHandle, ps); int32_t code = tsortOpen(pInfo->pSortHandle); @@ -2903,7 +2884,10 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - taosArrayClear(pInfo->sortSourceParams); + if (pInfo->base.dataReader != NULL) { + pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader); + pInfo->base.dataReader = NULL; + } tsortDestroySortHandle(pInfo->pSortHandle); pInfo->pSortHandle = NULL; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c8266ea0d9..c262153464 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -937,22 +937,20 @@ static int32_t createInitialSources(SSortHandle* pHandle) { // pHandle->numOfPages = 1024; //todo check sortbufsize createPageBuf(pHandle); - for (int i = 0; i < nSrc; ++i) { + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + while (pBlk != NULL) { SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); - - SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - while (pBlk != NULL) { - addDataBlockToPageBuf(pHandle, pBlk, aPgId); - pBlk = pHandle->fetchfp(pSrc->param); - } - SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); - code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pBlock, &pHandle->sourceId, aPgId); + addDataBlockToPageBuf(pHandle, pBlk, aPgId); + SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); + code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(aExtSrc); - return code; + taosArrayDestroy(aExtSrc); + return code; } + pBlk = pHandle->fetchfp(pSrc->param); } + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); taosArrayDestroy(aExtSrc); From 655233fd4fe8541a1af30c4ea24634307f7124c9 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 17 Jul 2023 15:05:30 +0800 Subject: [PATCH 04/14] fix: create initial source with blocks --- source/libs/executor/src/tsort.c | 172 +++++++++++++++++++++++-------- 1 file changed, 129 insertions(+), 43 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c262153464..e710b619b8 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -787,40 +787,106 @@ static int32_t createPageBuf(SSortHandle* pHandle) { return 0; } -static int32_t addDataBlockToPageBuf(SSortHandle * pHandle, SSDataBlock* pDataBlock, SArray* aPgId) { - int32_t start = 0; - while (start < pDataBlock->info.rows) { - int32_t stop = 0; - blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); - SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); - if (p == NULL) { - taosArrayDestroy(aPgId); - return terrno; - } +typedef struct SBlkMergeSupport { + int64_t** aTs; + int32_t* aRowIdx; + int32_t order; +} SBlkMergeSupport; - int32_t pageId = -1; - void* pPage = getNewBufPage(pHandle->pBuf, &pageId); - if (pPage == NULL) { - taosArrayDestroy(aPgId); - blockDataDestroy(p); - return terrno; - } +static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) { + int32_t left = *(int32_t*)pLeft; + int32_t right = *(int32_t*)pRight; - taosArrayPush(aPgId, &pageId); - - int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); - ASSERT(size <= getBufPageSize(pHandle->pBuf)); - - blockDataToBuf(pPage, p); - - setBufPageDirty(pPage, true); - releaseBufPage(pHandle->pBuf, pPage); - - blockDataDestroy(p); - start = stop + 1; + SBlkMergeSupport* pSup = (SBlkMergeSupport*)param; + if (pSup->aRowIdx[left] == -1) { + return 1; + } else if (pSup->aRowIdx[right] == -1) { + return -1; } - blockDataCleanup(pDataBlock); + int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]]; + int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]]; + + int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0); + if (pSup->order == TSDB_ORDER_DESC) { + ret = -1 * ret; + } + return ret; +} + +static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) { + int32_t pageId = -1; + void* pPage = getNewBufPage(pHandle->pBuf, &pageId); + taosArrayPush(aPgId, &pageId); + blockDataToBuf(pPage, blk); + + setBufPageDirty(pPage, true); + releaseBufPage(pHandle->pBuf, pPage); + blockDataCleanup(blk); + + return 0; +} + + +static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) { + int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, + blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); + blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); + blockDataCleanup(pHandle->pDataBlock); + + int32_t numBlks = taosArrayGetSize(aBlk); + + SBlkMergeSupport sup; + sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); + sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); + sup.order = order->order; + for (int i = 0; i < numBlks; ++i) { + SSDataBlock* blk = taosArrayGetP(aBlk, i); + SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId); + sup.aTs[i] = (int64_t*)col->pData; + sup.aRowIdx[i] = 0; + } + + int32_t totalRows = 0; + for (int i = 0; i < numBlks; ++i) { + SSDataBlock* blk = taosArrayGetP(aBlk, i); + totalRows += blk->info.rows; + } + + SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); + + SMultiwayMergeTreeInfo* pTree = NULL; + tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); + int32_t numEnded = 0; + int32_t nRows = 0; + while (nRows < totalRows) { + int32_t minIdx = tMergeTreeGetChosenIndex(pTree); + SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); + int32_t minRow = sup.aRowIdx[minIdx]; + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + ++nRows; + if (pHandle->pDataBlock->info.rows >= rowCap) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + } + if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { + sup.aRowIdx[minIdx] = -1; + ++numEnded; + } else { + ++sup.aRowIdx[minIdx]; + } + tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); + } + if (pHandle->pDataBlock->info.rows > 0) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + } + SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); + doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); + + taosMemoryFree(sup.aRowIdx); + taosMemoryFree(sup.aTs); + + tMergeTreeDestroy(&pTree); + return 0; } @@ -931,26 +997,46 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } } } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); - // pHandle->numOfPages = 1024; //todo check sortbufsize + pHandle->numOfPages = 1024; //todo check sortbufsize + size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize; createPageBuf(pHandle); SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - while (pBlk != NULL) { - SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); - addDataBlockToPageBuf(pHandle, pBlk, aPgId); - SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); - code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(aExtSrc); - return code; - } - pBlk = pHandle->fetchfp(pSrc->param); - } + int32_t szSort = 0; + SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + while (1) { + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + if (pBlk == NULL) { + break; + }; + + szSort += blockDataGetSize(pBlk); + SSDataBlock* blk = createOneDataBlock(pBlk, true); + taosArrayPush(aBlkSort, &blk); + + if (szSort > maxBufSize) { + sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + szSort = 0; + } + } + if (szSort > 0) { + sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + szSort = 0; + } + taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); taosArrayDestroy(aExtSrc); From 1b9754f1bc41dc74b6eed22816f5a3d23e509f90 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 17 Jul 2023 16:18:06 +0800 Subject: [PATCH 05/14] enhance: add log with info level. change to debug later --- source/libs/executor/src/scanoperator.c | 5 +---- source/libs/executor/src/tsort.c | 7 +++++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7babf68e56..0c90c654f4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2840,10 +2840,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - // todo the total available buffer should be determined by total capacity of buffer of this task. - // the additional one is reserved for merge result - pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); - + pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index e710b619b8..86e2477282 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -473,6 +473,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT pSource->src.rowIndex = -1; pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); + uInfo("adjust merge tree. %d source completed", *numOfCompleted); } else { int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -668,6 +669,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { // Only *numOfInputSources* can be loaded into buffer to perform the external sort. for (int32_t i = 0; i < sortGroup; ++i) { + uInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); pHandle->sourceId += 1; int32_t end = (i + 1) * numOfInputSources - 1; @@ -1001,7 +1003,6 @@ static int32_t createInitialSources(SSortHandle* pHandle) { size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); - pHandle->numOfPages = 1024; //todo check sortbufsize size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize; createPageBuf(pHandle); @@ -1021,6 +1022,8 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (szSort > maxBufSize) { sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + uInfo("initial source %zu created for %zu blocks", taosArrayGetSize(aExtSrc), taosArrayGetSize(aBlkSort)); + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { blockDataDestroy(taosArrayGetP(aBlkSort, i)); } @@ -1042,7 +1045,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { taosArrayDestroy(aExtSrc); pHandle->type = SORT_SINGLESOURCE_SORT; - + uInfo("create initial sources for table merge scan ended"); } return code; From ba2b4042950cfa4c95e9ddcbc2c4160b07be5067 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 17 Jul 2023 21:07:55 +0800 Subject: [PATCH 06/14] enhance: optimize msortComparFn for table merge scan --- source/libs/executor/inc/tsort.h | 6 ++ source/libs/executor/src/tsort.c | 95 +++++++++++++++++++------------- 2 files changed, 63 insertions(+), 38 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index a2cb1234ff..a15b1e0eaf 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -54,6 +54,12 @@ typedef struct SMsortComparParam { int32_t numOfSources; SArray* orderInfo; // SArray bool cmpGroupId; + + int32_t sortType; + // the following field to speed up when sortType == SORT_TABLE_MERGE_SCAN + int32_t tsSlotId; + int32_t order; + __compar_fn_t cmpFn; } SMsortComparParam; typedef struct SSortHandle SSortHandle; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 86e2477282..f274e9717b 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -197,7 +197,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.cmpGroupId = false; - + pSortHandle->cmpParam.sortType = type; + if (type == SORT_TABLE_MERGE_SCAN) { + SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0); + pSortHandle->cmpParam.tsSlotId = pOrder->slotId; + pSortHandle->cmpParam.order = pOrder->order; + pSortHandle->cmpParam.cmpFn = (pOrder->order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc; + } tsortSetComparFp(pSortHandle, msortComparFn); if (idstr != NULL) { @@ -489,6 +495,8 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT } releaseBufPage(pHandle->pBuf, pPage); + if (pSource->pageIndex % 256 == 0) + uInfo("got block from page %d from ext mem source %p", pSource->pageIndex, pSource); } } else { int64_t st = taosGetTimestampUs(); @@ -498,6 +506,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; + uInfo("adjust merge tree. %d source completed", *numOfCompleted); } } } @@ -578,53 +587,63 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { } } - for (int32_t i = 0; i < pInfo->size; ++i) { - SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); - SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); + if (pParam->sortType == SORT_TABLE_MERGE_SCAN) { + SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId); + SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId); + int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex; + int64_t* right1 = (int64_t*)(pRightColInfoData->pData) + pRightSource->src.rowIndex; - bool leftNull = false; - if (pLeftColInfoData->hasNull) { - if (pLeftBlock->pBlockAgg == NULL) { - leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex); - } else { - leftNull = - colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]); + int ret = pParam->cmpFn(left1, right1); + return ret; + } else { + for (int32_t i = 0; i < pInfo->size; ++i) { + SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); + SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); + SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); + + bool leftNull = false; + if (pLeftColInfoData->hasNull) { + if (pLeftBlock->pBlockAgg == NULL) { + leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex); + } else { + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, + pLeftBlock->pBlockAgg[i]); + } } - } - SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); - bool rightNull = false; - if (pRightColInfoData->hasNull) { - if (pRightBlock->pBlockAgg == NULL) { - rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex); - } else { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, - pRightBlock->pBlockAgg[i]); + bool rightNull = false; + if (pRightColInfoData->hasNull) { + if (pRightBlock->pBlockAgg == NULL) { + rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex); + } else { + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, + pRightBlock->pBlockAgg[i]); + } } - } - if (leftNull && rightNull) { - continue; // continue to next slot - } + if (leftNull && rightNull) { + continue; // continue to next slot + } - if (rightNull) { - return pOrder->nullFirst ? 1 : -1; - } + if (rightNull) { + return pOrder->nullFirst ? 1 : -1; + } - if (leftNull) { - return pOrder->nullFirst ? -1 : 1; - } + if (leftNull) { + return pOrder->nullFirst ? -1 : 1; + } - void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex); - void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex); + void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex); + void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex); - __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order); + __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order); - int ret = fn(left1, right1); - if (ret == 0) { - continue; - } else { - return ret; + int ret = fn(left1, right1); + if (ret == 0) { + continue; + } else { + return ret; + } } } return 0; From 97a6e89d1102cbbea99490be58e6a26f3b8874b7 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 18 Jul 2023 10:38:40 +0800 Subject: [PATCH 07/14] enhance: refactor create initial sources --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/tsort.c | 287 ++++++++++++------------ 2 files changed, 150 insertions(+), 139 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0c90c654f4..ab8991bd05 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2840,7 +2840,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->sortBufSize = 2048 * pInfo->bufPageSize; + pInfo->sortBufSize = 1024 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index f274e9717b..34f9dfc233 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -475,11 +475,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pHandle->type == SORT_SINGLESOURCE_SORT) { pSource->pageIndex++; if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { + qInfo("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex); (*numOfCompleted) += 1; pSource->src.rowIndex = -1; pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); - uInfo("adjust merge tree. %d source completed", *numOfCompleted); } else { int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -495,8 +495,6 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT } releaseBufPage(pHandle->pBuf, pPage); - if (pSource->pageIndex % 256 == 0) - uInfo("got block from page %d from ext mem source %p", pSource->pageIndex, pSource); } } else { int64_t st = taosGetTimestampUs(); @@ -506,7 +504,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; - uInfo("adjust merge tree. %d source completed", *numOfCompleted); + qInfo("adjust merge tree. %d source completed", *numOfCompleted); } } } @@ -688,7 +686,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { // Only *numOfInputSources* can be loaded into buffer to perform the external sort. for (int32_t i = 0; i < sortGroup; ++i) { - uInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); + qInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); pHandle->sourceId += 1; int32_t end = (i + 1) * numOfInputSources - 1; @@ -884,11 +882,13 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); ++nRows; if (pHandle->pDataBlock->info.rows >= rowCap) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); - } + } + if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { sup.aRowIdx[minIdx] = -1; ++numEnded; @@ -911,39 +911,101 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } -static int32_t createInitialSources(SSortHandle* pHandle) { - size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; +static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { + SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + + size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize; + createPageBuf(pHandle); + + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); + int32_t szSort = 0; + + SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + while (1) { + SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + + if (pBlk != NULL) { + szSort += blockDataGetSize(pBlk); + SSDataBlock* blk = createOneDataBlock(pBlk, true); + taosArrayPush(aBlkSort, &blk); + } + if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { + int64_t p = taosGetTimestampUs(); + sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; + + for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { + blockDataDestroy(taosArrayGetP(aBlkSort, i)); + } + taosArrayClear(aBlkSort); + szSort = 0; + } + if (pBlk == NULL) { + break; + }; + } + + taosArrayDestroy(aBlkSort); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); + taosArrayDestroy(aExtSrc); + + pHandle->type = SORT_SINGLESOURCE_SORT; + return 0; +} + +static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { int32_t code = 0; + size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; - if (pHandle->type == SORT_SINGLESOURCE_SORT) { - SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); - SSortSource* source = *pSource; - *pSource = NULL; + SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + SSortSource* source = *pSource; + *pSource = NULL; - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); - while (1) { - SSDataBlock* pBlock = pHandle->fetchfp(source->param); - if (pBlock == NULL) { - break; + while (1) { + SSDataBlock* pBlock = pHandle->fetchfp(source->param); + if (pBlock == NULL) { + break; + } + + if (pHandle->pDataBlock == NULL) { + uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols); + + // todo, number of pages are set according to the total available sort buffer + pHandle->numOfPages = 1024; + sortBufSize = pHandle->numOfPages * pHandle->pageSize; + pHandle->pDataBlock = createOneDataBlock(pBlock, false); + } + + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + + code = blockDataMerge(pHandle->pDataBlock, pBlock); + if (code != TSDB_CODE_SUCCESS) { + if (source->param && !source->onlyRef) { + taosMemoryFree(source->param); } - - if (pHandle->pDataBlock == NULL) { - uint32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock), numOfCols); - - // todo, number of pages are set according to the total available sort buffer - pHandle->numOfPages = 1024; - sortBufSize = pHandle->numOfPages * pHandle->pageSize; - pHandle->pDataBlock = createOneDataBlock(pBlock, false); + if (!source->onlyRef && source->src.pBlock) { + blockDataDestroy(source->src.pBlock); + source->src.pBlock = NULL; } + taosMemoryFree(source); + return code; + } - if (pHandle->beforeFp != NULL) { - pHandle->beforeFp(pBlock, pHandle->param); - } - - code = blockDataMerge(pHandle->pDataBlock, pBlock); - if (code != TSDB_CODE_SUCCESS) { + size_t size = blockDataGetSize(pHandle->pDataBlock); + if (size > sortBufSize) { + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); + code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); + if (code != 0) { if (source->param && !source->onlyRef) { taosMemoryFree(source->param); } @@ -951,122 +1013,71 @@ static int32_t createInitialSources(SSortHandle* pHandle) { blockDataDestroy(source->src.pBlock); source->src.pBlock = NULL; } + taosMemoryFree(source); return code; } - size_t size = blockDataGetSize(pHandle->pDataBlock); - if (size > sortBufSize) { - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); - if (code != 0) { - if (source->param && !source->onlyRef) { - taosMemoryFree(source->param); - } - if (!source->onlyRef && source->src.pBlock) { - blockDataDestroy(source->src.pBlock); - source->src.pBlock = NULL; - } - - taosMemoryFree(source); - return code; - } - - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); - code = doAddToBuf(pHandle->pDataBlock, pHandle); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } - - if (source->param && !source->onlyRef) { - taosMemoryFree(source->param); - } - - taosMemoryFree(source); - - if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { - size_t size = blockDataGetSize(pHandle->pDataBlock); - - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - - code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); - if (code != 0) { - return code; - } - - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; - - // All sorted data can fit in memory, external memory sort is not needed. Return to directly - if (size <= sortBufSize && pHandle->pBuf == NULL) { - pHandle->cmpParam.numOfSources = 1; - pHandle->inMemSort = true; - - pHandle->loops = 1; - pHandle->tupleHandle.rowIndex = -1; - pHandle->tupleHandle.pBlock = pHandle->pDataBlock; - return 0; - } else { - code = doAddToBuf(pHandle->pDataBlock, pHandle); + if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + code = doAddToBuf(pHandle->pDataBlock, pHandle); + if (code != TSDB_CODE_SUCCESS) { + return code; } } - } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { - SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); - size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); - SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); - - size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize; - createPageBuf(pHandle); - - SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); - int32_t szSort = 0; - - SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); - while (1) { - SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - if (pBlk == NULL) { - break; - }; - - szSort += blockDataGetSize(pBlk); - SSDataBlock* blk = createOneDataBlock(pBlk, true); - taosArrayPush(aBlkSort, &blk); - - if (szSort > maxBufSize) { - sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); - uInfo("initial source %zu created for %zu blocks", taosArrayGetSize(aExtSrc), taosArrayGetSize(aBlkSort)); - - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - taosArrayClear(aBlkSort); - szSort = 0; - } - } - if (szSort > 0) { - sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); - for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - taosArrayClear(aBlkSort); - szSort = 0; - } - taosArrayDestroy(aBlkSort); - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); - taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); - taosArrayDestroy(aExtSrc); - - pHandle->type = SORT_SINGLESOURCE_SORT; - uInfo("create initial sources for table merge scan ended"); } + if (source->param && !source->onlyRef) { + taosMemoryFree(source->param); + } + + taosMemoryFree(source); + + if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { + size_t size = blockDataGetSize(pHandle->pDataBlock); + + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); + + code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); + if (code != 0) { + return code; + } + + if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; + + // All sorted data can fit in memory, external memory sort is not needed. Return to directly + if (size <= sortBufSize && pHandle->pBuf == NULL) { + pHandle->cmpParam.numOfSources = 1; + pHandle->inMemSort = true; + + pHandle->loops = 1; + pHandle->tupleHandle.rowIndex = -1; + pHandle->tupleHandle.pBlock = pHandle->pDataBlock; + return 0; + } else { + code = doAddToBuf(pHandle->pDataBlock, pHandle); + } + } + return code; +} + +static int32_t createInitialSources(SSortHandle* pHandle) { + int32_t code = 0; + + if (pHandle->type == SORT_SINGLESOURCE_SORT) { + code = createBlocksQuickSortInitialSources(pHandle); + } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + code = createBlocksMergeSortInitialSources(pHandle); + } + qInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); + for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) { + SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); + qInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList)); + } return code; } From f79fc81d9c2d53d8cfe7e0e8d4c48bb189705c45 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 18 Jul 2023 15:46:38 +0800 Subject: [PATCH 08/14] fix: add to page buf in the same way as single source sort --- source/libs/executor/src/tsort.c | 57 +++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 34f9dfc233..6b93f95099 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -837,6 +837,10 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); taosArrayPush(aPgId, &pageId); + + int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t); + ASSERT(size <= getBufPageSize(pHandle->pBuf)); + blockDataToBuf(pPage, blk); setBufPageDirty(pPage, true); @@ -846,13 +850,39 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, return 0; } +static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) { + int sz = 0; + int numCols = taosArrayGetSize(blk->pDataBlock); + if (!blk->info.hasVarCol) { + sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0); + sz += blockDataGetRowSize(blk); + } else { + for (int32_t i = 0; i < numCols; ++i) { + SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + if (pColInfoData->varmeta.offset[row] != -1) { + char* p = colDataGetData(pColInfoData, row); + sz += varDataTLen(p); + } + + sz += sizeof(pColInfoData->varmeta.offset[0]); + } else { + sz += pColInfoData->info.bytes; + + if (((rowIdxInPage) & 0x07) == 0) { + sz += 1; // bitmap + } + } + } + } + return sz; +} static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) { - int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, - blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); + int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); + int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); blockDataCleanup(pHandle->pDataBlock); - int32_t numBlks = taosArrayGetSize(aBlk); SBlkMergeSupport sup; @@ -878,16 +908,25 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); int32_t numEnded = 0; int32_t nRows = 0; + + size_t blkPgSz = pgHeaderSz; + while (nRows < totalRows) { int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; + int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows); - appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); - ++nRows; - if (pHandle->pDataBlock->info.rows >= rowCap) { - appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + blkPgSz = pgHeaderSz; + bufInc = getPageBufIncForRow(minBlk, minRow, 0); } + blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); + appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); + blkPgSz += bufInc; + + ++nRows; if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { sup.aRowIdx[minIdx] = -1; @@ -1073,10 +1112,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { code = createBlocksMergeSortInitialSources(pHandle); } - qInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); + uInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) { SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); - qInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList)); + uInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList)); } return code; } From 4348b9c51dfbc059ec408dfd505adaebdbbc0fee Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 18 Jul 2023 16:47:31 +0800 Subject: [PATCH 09/14] fix: use uinfo to show on stdout --- source/libs/executor/src/tsort.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 6b93f95099..4bcf455ef7 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -475,12 +475,14 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pHandle->type == SORT_SINGLESOURCE_SORT) { pSource->pageIndex++; if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { - qInfo("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex); + uInfo("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex); (*numOfCompleted) += 1; pSource->src.rowIndex = -1; pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); } else { + if (pSource->pageIndex % 512 == 0) uInfo("begin source %p page %d", pSource, pSource->pageIndex); + int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); void* pPage = getBufPage(pHandle->pBuf, *pPgId); @@ -493,7 +495,6 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (code != TSDB_CODE_SUCCESS) { return code; } - releaseBufPage(pHandle->pBuf, pPage); } } else { @@ -504,7 +505,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; - qInfo("adjust merge tree. %d source completed", *numOfCompleted); + uInfo("adjust merge tree. %d source completed", *numOfCompleted); } } } @@ -686,7 +687,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { // Only *numOfInputSources* can be loaded into buffer to perform the external sort. for (int32_t i = 0; i < sortGroup; ++i) { - qInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); + uInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); pHandle->sourceId += 1; int32_t end = (i + 1) * numOfInputSources - 1; @@ -981,6 +982,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } taosArrayClear(aBlkSort); szSort = 0; + uInfo("source %zu created", taosArrayGetSize(aExtSrc)); } if (pBlk == NULL) { break; From b9aeda263c8385c11681ebef4db99d36191f9885 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 19 Jul 2023 09:48:52 +0800 Subject: [PATCH 10/14] fix: fix ubsan error of null argument --- source/common/src/tdatablock.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b2f03fa7ba..887a110831 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -632,7 +632,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { pStart += colSize; } } else { - memcpy(pStart, pCol->pData, dataSize); + if (dataSize != 0) { + // ubsan reports error if pCol->pData==NULL && dataSize==0 + memcpy(pStart, pCol->pData, dataSize); + } pStart += dataSize; } } @@ -684,8 +687,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { return TSDB_CODE_FAILED; } } - - memcpy(pCol->pData, pStart, colLength); + if (colLength != 0) { + // ubsan reports error if colLength==0 && pCol->pData == 0 + memcpy(pCol->pData, pStart, colLength); + } pStart += colLength; } From d1a1976aa47cdac58e4fe56e3634c121ac43c96c Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 19 Jul 2023 13:59:43 +0800 Subject: [PATCH 11/14] enhance: add limit to merge sort --- source/libs/executor/inc/tsort.h | 12 +++-- source/libs/executor/src/scanoperator.c | 8 +++- source/libs/executor/src/tsort.c | 58 ++++++++++++++++--------- 3 files changed, 51 insertions(+), 27 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index a15b1e0eaf..538a9f18f6 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -26,7 +26,7 @@ extern "C" { enum { SORT_MULTISOURCE_MERGE = 0x1, SORT_SINGLESOURCE_SORT = 0x2, - SORT_TABLE_MERGE_SCAN = 0x3 + SORT_BLOCK_TS_MERGE = 0x3 }; typedef struct SMultiMergeSource { @@ -56,7 +56,7 @@ typedef struct SMsortComparParam { bool cmpGroupId; int32_t sortType; - // the following field to speed up when sortType == SORT_TABLE_MERGE_SCAN + // the following field to speed up when sortType == SORT_BLOCK_TS_MERGE int32_t tsSlotId; int32_t order; __compar_fn_t cmpFn; @@ -77,8 +77,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* * @return */ SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, - uint32_t sortBufSize); + SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, + uint32_t pqSortBufSize); void tsortSetForceUsePQSort(SSortHandle* pHandle); @@ -117,6 +117,10 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc */ int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp); +/** + * +*/ +void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit); /** * */ diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ab8991bd05..6edd7b2d47 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2842,9 +2842,13 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortBufSize = 1024 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_TABLE_MERGE_SCAN, pInfo->bufPageSize, numOfBufPage, + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); - + int64_t mergeLimit = -1; + if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) { + mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset; + } + tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); // one table has one data block diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 4bcf455ef7..c56dab1e33 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -42,13 +42,15 @@ struct SSortHandle { int64_t startTs; uint64_t totalElapsed; - uint64_t maxRows; - uint32_t maxTupleLength; - uint32_t sortBufSize; + uint64_t pqMaxRows; + uint32_t pqMaxTupleLength; + uint32_t pqSortBufSize; bool forceUsePQSort; BoundedQueue* pBoundedQueue; uint32_t tmpRowIdx; + int64_t mergeLimit; + int32_t sourceId; SSDataBlock* pDataBlock; SMsortComparParam cmpParam; @@ -173,8 +175,8 @@ void destroyTuple(void* t) { * @return */ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, - uint32_t sortBufSize) { + SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength, + uint32_t pqSortBufSize) { SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); pSortHandle->type = type; @@ -183,10 +185,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pSortInfo = pSortInfo; pSortHandle->loops = 0; - pSortHandle->maxTupleLength = maxTupleLength; - if (maxRows != 0) { - pSortHandle->sortBufSize = sortBufSize; - pSortHandle->maxRows = maxRows; + pSortHandle->pqMaxTupleLength = pqMaxTupleLength; + if (pqMaxRows != 0) { + pSortHandle->pqSortBufSize = pqSortBufSize; + pSortHandle->pqMaxRows = pqMaxRows; } pSortHandle->forceUsePQSort = false; @@ -194,11 +196,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); } + pSortHandle->mergeLimit = -1; + pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.cmpGroupId = false; pSortHandle->cmpParam.sortType = type; - if (type == SORT_TABLE_MERGE_SCAN) { + if (type == SORT_BLOCK_TS_MERGE) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0); pSortHandle->cmpParam.tsSlotId = pOrder->slotId; pSortHandle->cmpParam.order = pOrder->order; @@ -586,7 +590,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { } } - if (pParam->sortType == SORT_TABLE_MERGE_SCAN) { + if (pParam->sortType == SORT_BLOCK_TS_MERGE) { SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId); int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex; @@ -709,18 +713,23 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { taosArrayDestroy(pResList); return code; } - + int nRows = 0; SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { if (tsortIsClosed(pHandle)) { code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED; return code; } - + + if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { + break; + } + SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { break; } + nRows += pDataBlock->info.rows; int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); @@ -740,7 +749,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); - blockDataCleanup(pDataBlock); } @@ -846,7 +854,6 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); - blockDataCleanup(blk); return 0; } @@ -913,6 +920,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO size_t blkPgSz = pgHeaderSz; while (nRows < totalRows) { + if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { + break; + } int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; @@ -920,6 +930,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; bufInc = getPageBufIncForRow(minBlk, minRow, 0); } @@ -939,6 +950,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO } if (pHandle->pDataBlock->info.rows > 0) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + blockDataCleanup(pHandle->pDataBlock); } SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); @@ -1061,7 +1073,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); code = doAddToBuf(pHandle->pDataBlock, pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -1086,7 +1098,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { return code; } - if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); + if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; @@ -1111,7 +1123,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (pHandle->type == SORT_SINGLESOURCE_SORT) { code = createBlocksQuickSortInitialSources(pHandle); - } else if (pHandle->type == SORT_TABLE_MERGE_SCAN) { + } else if (pHandle->type == SORT_BLOCK_TS_MERGE) { code = createBlocksMergeSortInitialSources(pHandle); } uInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); @@ -1165,6 +1177,10 @@ void tsortSetClosed(SSortHandle* pHandle) { atomic_store_8(&pHandle->closed, 2); } +void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) { + pHandle->mergeLimit = mergeLimit; +} + int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) { pHandle->fetchfp = fetchFp; @@ -1244,8 +1260,8 @@ void tsortSetForceUsePQSort(SSortHandle* pHandle) { static bool tsortIsPQSortApplicable(SSortHandle* pHandle) { if (pHandle->type != SORT_SINGLESOURCE_SORT) return false; if (tsortIsForceUsePQSort(pHandle)) return true; - uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*)); - return maxRowsFitInMemory > pHandle->maxRows; + uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*)); + return maxRowsFitInMemory > pHandle->pqMaxRows; } static bool tsortPQCompFn(void* a, void* b, void* param) { @@ -1291,7 +1307,7 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param) } static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { - pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle); + pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle); if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; tsortSetComparFp(pHandle, tupleComparFn); From 30129f64da3da67b56f99951ddde7ae0a784c14d Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 20 Jul 2023 09:28:19 +0800 Subject: [PATCH 12/14] enhance: merge blocks of the same table before sort --- source/libs/executor/src/tsort.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c56dab1e33..18c46cd03f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -24,6 +24,7 @@ #include "tpagedbuf.h" #include "tsort.h" #include "tutil.h" +#include "tsimplehash.h" struct STupleHandle { SSDataBlock* pBlock; @@ -975,15 +976,27 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int32_t szSort = 0; SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); + SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); - SSDataBlock* blk = createOneDataBlock(pBlk, true); - taosArrayPush(aBlkSort, &blk); + + void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid)); + if (ppBlk != NULL) { + SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); + blockDataMerge(tBlk, pBlk); + } else { + SSDataBlock* tBlk = createOneDataBlock(pBlk, true); + tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); + taosArrayPush(aBlkSort, &tBlk); + } } + if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { + tSimpleHashClear(mUidBlk); + int64_t p = taosGetTimestampUs(); sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); int64_t el = taosGetTimestampUs() - p; @@ -1000,7 +1013,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { break; }; } - + tSimpleHashCleanup(mUidBlk); taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); From 5d9f6fd4812db0bdee7f328c955e1aeb60af5864 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 20 Jul 2023 12:29:34 +0800 Subject: [PATCH 13/14] fix: fix limit/offset bugs --- source/libs/executor/src/scanoperator.c | 38 ++++++++++++++----------- source/libs/executor/src/tsort.c | 33 ++++++++++++--------- 2 files changed, 41 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6edd7b2d47..27cbdb66cf 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2905,28 +2905,32 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pResBlock); - + STupleHandle* pTupleHandle = NULL; while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; + while (1) { + pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(pResBlock, pTupleHandle); + if (pResBlock->info.rows >= capacity) { + break; + } } - appendOneRowToDataBlock(pResBlock, pTupleHandle); - if (pResBlock->info.rows >= capacity) { - break; + if (tsortIsClosed(pHandle)) { + terrno = TSDB_CODE_TSC_QUERY_CANCELLED; + T_LONG_JMP(pOperator->pTaskInfo->env, terrno); } + + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); + qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, + pInfo->limitInfo.numOfOutputRows); + if (pTupleHandle == NULL || limitReached || pResBlock->info.rows > 0) { + break; + } } - - if (tsortIsClosed(pHandle)) { - terrno = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pOperator->pTaskInfo->env, terrno); - } - - bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); - qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, - pInfo->limitInfo.numOfOutputRows); - return (pResBlock->info.rows > 0) ? pResBlock : NULL; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 18c46cd03f..b51f515240 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -714,7 +714,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { taosArrayDestroy(pResList); return code; } - int nRows = 0; + + int nMergedRows = 0; + SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { if (tsortIsClosed(pHandle)) { @@ -722,15 +724,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return code; } - if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { - break; - } - SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { break; } - nRows += pDataBlock->info.rows; int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); @@ -750,7 +747,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); + nMergedRows += pDataBlock->info.rows; + blockDataCleanup(pDataBlock); + if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { + break; + } } sortComparCleanup(&pHandle->cmpParam); @@ -915,15 +917,12 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO SMultiwayMergeTreeInfo* pTree = NULL; tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn); - int32_t numEnded = 0; int32_t nRows = 0; - + int32_t nMergedRows = 0; + bool mergeLimitReached = false; size_t blkPgSz = pgHeaderSz; while (nRows < totalRows) { - if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) { - break; - } int32_t minIdx = tMergeTreeGetChosenIndex(pTree); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); int32_t minRow = sup.aRowIdx[minIdx]; @@ -931,9 +930,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + nMergedRows += pHandle->pDataBlock->info.rows; + blockDataCleanup(pHandle->pDataBlock); blkPgSz = pgHeaderSz; bufInc = getPageBufIncForRow(minBlk, minRow, 0); + if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { + mergeLimitReached = true; + break; + } } blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); @@ -943,14 +948,16 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) { sup.aRowIdx[minIdx] = -1; - ++numEnded; } else { ++sup.aRowIdx[minIdx]; } tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); } if (pHandle->pDataBlock->info.rows > 0) { - appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + if (!mergeLimitReached) { + appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); + nMergedRows += pHandle->pDataBlock->info.rows; + } blockDataCleanup(pHandle->pDataBlock); } SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); From 1a0031a43c48291473fd9254fbcb033a9466a349 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 20 Jul 2023 13:53:13 +0800 Subject: [PATCH 14/14] enhance: uinfo to qdebug and increase pages num to 2048 --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/tsort.c | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 27cbdb66cf..dbec81e6a4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2840,7 +2840,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - pInfo->sortBufSize = 1024 * pInfo->bufPageSize; + pInfo->sortBufSize = 2048 * pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index b51f515240..30e7148736 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -480,13 +480,13 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pHandle->type == SORT_SINGLESOURCE_SORT) { pSource->pageIndex++; if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { - uInfo("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex); + qDebug("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex); (*numOfCompleted) += 1; pSource->src.rowIndex = -1; pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); } else { - if (pSource->pageIndex % 512 == 0) uInfo("begin source %p page %d", pSource, pSource->pageIndex); + if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex); int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -510,7 +510,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; - uInfo("adjust merge tree. %d source completed", *numOfCompleted); + qDebug("adjust merge tree. %d source completed", *numOfCompleted); } } } @@ -692,7 +692,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { // Only *numOfInputSources* can be loaded into buffer to perform the external sort. for (int32_t i = 0; i < sortGroup; ++i) { - uInfo("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); + qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources); pHandle->sourceId += 1; int32_t end = (i + 1) * numOfInputSources - 1; @@ -716,7 +716,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } int nMergedRows = 0; - + SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { if (tsortIsClosed(pHandle)) { @@ -1014,7 +1014,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } taosArrayClear(aBlkSort); szSort = 0; - uInfo("source %zu created", taosArrayGetSize(aExtSrc)); + qDebug("source %zu created", taosArrayGetSize(aExtSrc)); } if (pBlk == NULL) { break; @@ -1146,11 +1146,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { } else if (pHandle->type == SORT_BLOCK_TS_MERGE) { code = createBlocksMergeSortInitialSources(pHandle); } - uInfo("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); - for (int i = 0; i < taosArrayGetSize(pHandle->pOrderedSource); ++i) { - SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, i); - uInfo("source %d, num of pages %zu", i,taosArrayGetSize(pSrc->pageIdList)); - } + qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource)); return code; }