From b6095261f238e486c0530dec0bd3674c19b47daa Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 1 Mar 2024 17:05:24 +0800 Subject: [PATCH] feat: ordered region and blocks --- source/libs/executor/inc/tsort.h | 2 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/tsort.c | 347 +++++++++++------------- 3 files changed, 166 insertions(+), 185 deletions(-) diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index b4e0c70f31..ca799673ea 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -194,7 +194,7 @@ void tsortSetClosed(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle); void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param); -int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsSize); +int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsSize); void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle); /** diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1d64b066e9..1636cd21f0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4015,7 +4015,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); int32_t memSize = 512 * 1024 * 1024; - code = tsortSetSortByRowId(pInfo->pSortHandle, pInfo->bufPageSize, memSize); + code = tsortSetSortByRowId(pInfo->pSortHandle, memSize); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1ceb5403c9..38ab506918 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -45,23 +45,27 @@ typedef struct SSortMemPageEntry { } SSortMemPageEntry; -typedef struct SSortMemFile { - int32_t pageSize; - int32_t cacheSize; - - char* writePageBuf; - int32_t startPageId; - int32_t numWritePages; - - int32_t currPageId; - int32_t currPageOffset; - bool bDirty; +typedef struct SSortMemFileRegion { + int64_t fileOffset; + int32_t regionSize; - int32_t totalMemPages; - SSortMemPageEntry* pagesHead; - SSortMemPageEntry* pagesTail; - int32_t numMemPages; - SSHashObj* mActivePages; + int32_t bufRegOffset; + int32_t bufLen; + char* buf; +} SSortMemFileRegion; + +typedef struct SSortMemFile { + char* writeBuf; + int32_t writeBufSize; + int64_t writeFileOffset; + + int32_t currRegionId; + int32_t currRegionOffset; + bool bRegionDirty; + + SArray* aFileRegions; + int32_t cacheSize; + int32_t blockSize; FILE* pTdFile; // TdFilePtr pTdFile; @@ -125,11 +129,8 @@ struct SSortHandle { }; static int32_t destroySortMemFile(SSortHandle* pHandle); -static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage); -static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId); -static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle); -static int32_t freeExtRowMemFileWriteBuf(SSortHandle* pHandle); - +static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen, + char** ppRow, bool* pFreeRow); void tsortSetSingleTableMerge(SSortHandle* pHandle) { pHandle->singleTableMerge = true; } @@ -915,14 +916,14 @@ static int32_t createPageBuf(SSortHandle* pHandle) { void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHandle* pTupleHandle) { if (pHandle->bSortByRowId) { - int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); + int32_t regionId = *(int32_t*)tsortGetValue(pTupleHandle, 1); int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); + int32_t length = *(int32_t*)tsortGetValue(pTupleHandle, 3); - char* page = NULL; - getPageFromExtMemFile(pHandle, pageId, &page); - + char* buf = NULL; + bool bFreeRow = false; + getRowBufFromExtMemFile(pHandle, regionId, offset, length, &buf, &bFreeRow); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - char* buf = (char*)page + offset; char* isNull = (char*)buf; char* pStart = (char*)buf + sizeof(int8_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { @@ -943,7 +944,9 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa colDataSetNULL(pColInfo, pBlock->info.rows); } } - + if (bFreeRow) { + taosMemoryFree(buf); + } if (*(int32_t*)pStart != pStart - buf) { qError("table merge scan row buf deserialization. length error %d != %d ", *(int32_t*)pStart, (int32_t)(pStart - buf)); @@ -953,9 +956,6 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; pBlock->info.rows += 1; - if (offset + pHandle->extRowBytes >= pHandle->pExtRowsMemFile->pageSize) { - setExtMemFilePageUnused(pHandle->pExtRowsMemFile, pageId); - } } else { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); @@ -1020,102 +1020,60 @@ static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { return (int32_t)(pStart - (char*)buf); } -static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage) { +static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, int32_t tupleOffset, int32_t rowLen, + char** ppRow, bool* pFreeRow) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; - SSortMemPageEntry** ppPageEntry = tSimpleHashGet(pMemFile->mActivePages, &pageId, sizeof(pageId)); - if (ppPageEntry) { - *ppPage = (char*)((*ppPageEntry)->data); + SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, regionId); + if (pRegion->buf == NULL) { + pRegion->bufRegOffset = 0; + pRegion->buf = taosMemoryMalloc(pMemFile->blockSize); + tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); + int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); + fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile); + pRegion->bufLen = readBytes; + } + // TODO: ASSERT(pRegion->offset < tupleOffset); + if (pRegion->bufRegOffset + pRegion->bufLen >= tupleOffset + rowLen) { + *pFreeRow = false; + *ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset; } else { - SSortMemPageEntry* pEntry = pMemFile->pagesHead->next; - bool freeEntryWhenError = false; - if (pEntry && !pEntry->active || pMemFile->numMemPages >= pMemFile->totalMemPages) { - if (pEntry->active) { - tSimpleHashRemove(pMemFile->mActivePages, &pEntry->pageId, sizeof(pEntry->pageId)); - } - pEntry->prev->next = pEntry->next; - pEntry->next->prev = pEntry->prev; - pEntry->active = false; - } else if (pMemFile->numMemPages < pMemFile->totalMemPages) { - pEntry = taosMemoryCalloc(1, sizeof(SSortMemPageEntry)); - pEntry->data = taosMemoryMalloc(pMemFile->pageSize); - freeEntryWhenError = true; - ++pMemFile->numMemPages; - } - { - int ret = tsortSeekFile(pMemFile->pTdFile, ((int64_t)pageId) * pMemFile->pageSize, SEEK_SET); - if (ret == 0) { - ret = fread(pEntry->data, pMemFile->pageSize, 1, pMemFile->pTdFile); - } - if (ret != 1) { - terrno = TAOS_SYSTEM_ERROR(errno); - if (freeEntryWhenError) { - taosMemoryFreeClear(pEntry->data); - taosMemoryFreeClear(pEntry); - } - return terrno; - } - SSortMemPageEntry* tail = pMemFile->pagesTail; - tail->next = pEntry; - pEntry->next = NULL; - pEntry->prev = tail; - pEntry->active = true; - pMemFile->pagesTail = pEntry; - tSimpleHashPut(pMemFile->mActivePages, &pageId, sizeof(pageId), &pEntry, POINTER_BYTES); - *ppPage = pEntry->data; - } + *ppRow = taosMemoryMalloc(rowLen); + int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset); + memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, + szThisBlock); + tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); + int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); + fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile); + memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock); + *pFreeRow = true; + pRegion->bufRegOffset += pRegion->bufLen; + pRegion->bufLen = readBytes; } + //TODO: free region memory return TSDB_CODE_SUCCESS; } -static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId) { - SSortMemPageEntry** ppPageEntry = tSimpleHashGet(pMemFile->mActivePages, &pageId, sizeof(pageId)); - SSortMemPageEntry* pEntry = *ppPageEntry; - if (pEntry == pMemFile->pagesTail) { - pMemFile->pagesTail = pEntry->prev; - } - - pEntry->prev->next = pEntry->next; - pEntry->next->prev = pEntry->prev; - - SSortMemPageEntry* first = pMemFile->pagesHead->next; - SSortMemPageEntry* head = pMemFile->pagesHead; - head->next = pEntry; - pEntry->next = first; - first->prev = pEntry; - pEntry->prev = head; - - pEntry->active = false; - tSimpleHashRemove(pMemFile->mActivePages, &pageId, sizeof(pageId)); - return; -} - static int32_t createSortMemFile(SSortHandle* pHandle) { if (pHandle->pExtRowsMemFile != NULL) { return TSDB_CODE_SUCCESS; } SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile)); - + pMemFile->cacheSize = pHandle->extRowsMemSize; taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath); pMemFile->pTdFile = fopen(pMemFile->memFilePath, "w+"); if (pMemFile->pTdFile == NULL) { taosMemoryFree(pMemFile); return TAOS_SYSTEM_ERROR(errno); } - pMemFile->currPageId = -1; - pMemFile->currPageOffset = -1; + pMemFile->currRegionId = -1; + pMemFile->currRegionOffset = -1; - pMemFile->pageSize = pHandle->extRowsPageSize; - pMemFile->cacheSize = pHandle->extRowsMemSize; - pMemFile->numWritePages = pMemFile->cacheSize/pMemFile->pageSize; - pMemFile->writePageBuf = taosMemoryMalloc(pMemFile->pageSize * pMemFile->numWritePages); - pMemFile->bDirty = false; + pMemFile->writeBufSize = 64 * 1024 * 1024; + pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize); + pMemFile->writeFileOffset = -1; + pMemFile->bRegionDirty = false; - pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - pMemFile->pagesHead = taosMemoryCalloc(1, sizeof(SSortMemPageEntry)); - pMemFile->pagesTail = pMemFile->pagesHead; - - pMemFile->totalMemPages = pMemFile->cacheSize / pMemFile->pageSize; - pMemFile->numMemPages = 0; + pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion)); pHandle->pExtRowsMemFile = pMemFile; return TSDB_CODE_SUCCESS; @@ -1125,20 +1083,15 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { if (pHandle->pExtRowsMemFile == NULL) return TSDB_CODE_SUCCESS; SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; - SSortMemPageEntry* pEntry = pMemFile->pagesHead; - while (pEntry != NULL) { - if (pEntry->data) { - taosMemoryFree(pEntry->data); - } - SSortMemPageEntry* pCurr = pEntry; - pEntry = pEntry->next; - taosMemoryFree(pCurr); + for (int32_t i = 0; i < taosArrayGetSize(pMemFile->aFileRegions); ++i) { + SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i); + taosMemoryFree(pRegion->buf); } - tSimpleHashCleanup(pMemFile->mActivePages); - pMemFile->mActivePages = NULL; + taosArrayDestroy(pMemFile->aFileRegions); + pMemFile->aFileRegions = NULL; - taosMemoryFree(pMemFile->writePageBuf); - pMemFile->writePageBuf = NULL; + taosMemoryFree(pMemFile->writeBuf); + pMemFile->writeBuf = NULL; fclose(pMemFile->pTdFile); pMemFile->pTdFile = NULL; @@ -1148,68 +1101,89 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { return TSDB_CODE_SUCCESS; } -static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { +static int32_t tsortOpenRegion(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; - if (pMemFile->currPageId == -1) { - pMemFile->currPageId = 0; - pMemFile->currPageOffset = 0; - pMemFile->startPageId = 0; + if (pMemFile->currRegionId == -1) { + SSortMemFileRegion region = {0}; + region.fileOffset = 0; + region.bufRegOffset = 0; + taosArrayPush(pMemFile->aFileRegions, ®ion); + pMemFile->currRegionId = 0; + pMemFile->currRegionOffset = 0; + pMemFile->writeFileOffset = 0; } else { - if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) { + SSortMemFileRegion regionNew = {0}; + SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + regionNew.fileOffset = pRegion->fileOffset + pRegion->regionSize; + regionNew.bufRegOffset = 0; + taosArrayPush(pMemFile->aFileRegions, ®ionNew); + ++pMemFile->currRegionId; + pMemFile->currRegionOffset = 0; + pMemFile->writeFileOffset = regionNew.fileOffset; + } + return TSDB_CODE_SUCCESS; +} - ++pMemFile->currPageId; - pMemFile->currPageOffset = 0; +static int32_t tsortCloseRegion(SSortHandle* pHandle) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; + SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + pRegion->regionSize = pMemFile->currRegionOffset; + int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); + if (writeBytes > 0) { + int ret = tsortSeekFile(pMemFile->pTdFile, pMemFile->writeFileOffset, SEEK_SET); + if (ret == 0) { + ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); + } + if (ret != 1) { + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } + pMemFile->bRegionDirty = false; + } + return TSDB_CODE_SUCCESS; +} - if (pMemFile->currPageId - pMemFile->startPageId >= pMemFile->numWritePages) { - int ret = tsortSeekFile(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); - if (ret == 0) { - ret = fwrite(pMemFile->writePageBuf, pMemFile->pageSize * pMemFile->numWritePages, 1, pMemFile->pTdFile); - } - if (ret != 1) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; - } - pMemFile->startPageId = pMemFile->currPageId; +static int32_t tsortFinalizeRegions(SSortHandle* pHandle) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; + size_t numRegions = taosArrayGetSize(pMemFile->aFileRegions); + ASSERT(numRegions == (pMemFile->currRegionId + 1)); + if (numRegions == 0) return TSDB_CODE_SUCCESS; + int32_t blockReadBytes = (pMemFile->cacheSize / numRegions + 4095) & ~4095; + pMemFile->blockSize = blockReadBytes; + + for (int32_t i = 0; i < numRegions; ++i) { + SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, i); + pRegion->bufRegOffset = 0; + } + taosMemoryFree(pMemFile->writeBuf); + pMemFile->writeBuf = NULL; + return TSDB_CODE_SUCCESS; +} + +static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; + SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId); + { + if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { + int ret = tsortSeekFile(pMemFile->pTdFile, pMemFile->writeFileOffset, SEEK_SET); + int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); + if (ret == 0) { + ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); } + if (ret != 1) { + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } + pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset; } } - - *pPageId = pMemFile->currPageId; - *pOffset = pMemFile->currPageOffset; - int32_t offsetPages = (pMemFile->currPageId - pMemFile->startPageId) * pMemFile->pageSize; - int32_t blockLen = blockRowToBuf(pBlock, rowIdx, - pMemFile->writePageBuf + offsetPages + pMemFile->currPageOffset); + *pRegionId = pMemFile->currRegionId; + *pOffset = pMemFile->currRegionOffset; + int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); + int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset); *pLength = blockLen; - pMemFile->currPageOffset += blockLen; - pMemFile->bDirty = true; - return TSDB_CODE_SUCCESS; -} - -static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle) { - SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; - if (!pMemFile->bDirty) { - return TSDB_CODE_SUCCESS; - } - int ret = tsortSeekFile(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); - int32_t numWriteBytes = pMemFile->pageSize * (pMemFile->currPageId - pMemFile->startPageId + 1); - if (ret == 0) { - ret = fwrite(pMemFile->writePageBuf, numWriteBytes, 1, pMemFile->pTdFile); - } - if (ret != 1) { - terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; - } - pMemFile->bDirty = false; - return TSDB_CODE_SUCCESS; -} - -static int32_t freeExtRowMemFileWriteBuf(SSortHandle* pHandle) { - SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; - - if (pMemFile == NULL) return TSDB_CODE_SUCCESS; - - taosMemoryFree(pMemFile->writePageBuf); - pMemFile->writePageBuf = NULL; + pMemFile->currRegionOffset += blockLen; + pMemFile->bRegionDirty = true; return TSDB_CODE_SUCCESS; } @@ -1225,12 +1199,15 @@ static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource char* pData = colDataGetData(pSrcTsCol, *rowIndex); colDataSetVal(pTsCol, pBlock->info.rows, pData, false); - SColumnInfoData* pPageIdCol = taosArrayGet(pBlock->pDataBlock, 1); - colDataSetInt32(pPageIdCol, pBlock->info.rows, &pageId); + SColumnInfoData* pRegionIdCol = taosArrayGet(pBlock->pDataBlock, 1); + colDataSetInt32(pRegionIdCol, pBlock->info.rows, &pageId); SColumnInfoData* pOffsetCol = taosArrayGet(pBlock->pDataBlock, 2); colDataSetInt32(pOffsetCol, pBlock->info.rows, &offset); + SColumnInfoData* pLengthCol = taosArrayGet(pBlock->pDataBlock, 3); + colDataSetInt32(pLengthCol, pBlock->info.rows, &length); + pBlock->info.rows += 1; *rowIndex += 1; } @@ -1240,10 +1217,12 @@ static void initRowIdSort(SSortHandle* pHandle) { SSDataBlock* pSortInput = createDataBlock(); SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); blockDataAppendColInfo(pSortInput, &tsCol); - SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); - blockDataAppendColInfo(pSortInput, &pageIdCol); + SColumnInfoData regionIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2); + blockDataAppendColInfo(pSortInput, ®ionIdCol); SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3); blockDataAppendColInfo(pSortInput, &offsetCol); + SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); + blockDataAppendColInfo(pSortInput, &lengthCol); blockDataDestroy(pHandle->pDataBlock); pHandle->pDataBlock = pSortInput; @@ -1267,9 +1246,8 @@ static void initRowIdSort(SSortHandle* pHandle) { return; } -int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsMemSize) { +int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) { pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t); - pHandle->extRowsPageSize = extRowsPageSize; pHandle->extRowsMemSize = extRowsMemSize; SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); pHandle->extRowsOrderInfo = *pOrder; @@ -1552,7 +1530,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); - int64_t p = taosGetTimestampUs(); bool bExtractedBlock = false; bool bSkipBlock = false; if (pBlk != NULL && pHandle->mergeLimit > 0) { @@ -1594,6 +1571,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { tSimpleHashClear(mUidBlk); int64_t p = taosGetTimestampUs(); + if (pHandle->bSortByRowId) { + tsortOpenRegion(pHandle); + } code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); if (code != TSDB_CODE_SUCCESS) { @@ -1603,7 +1583,9 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayClear(aBlkSort); break; } - + if (pHandle->bSortByRowId) { + tsortCloseRegion(pHandle); + } int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; @@ -1641,8 +1623,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayDestroy(aExtSrc); tSimpleHashCleanup(mTableNumRows); if (pHandle->bSortByRowId) { - saveDirtyPagesToExtRowsMemFile(pHandle); - freeExtRowMemFileWriteBuf(pHandle); + tsortFinalizeRegions(pHandle); } pHandle->type = SORT_SINGLESOURCE_SORT; return code;