From 83e44f07355e09a17ed5dfa1d68d6483bfcefda8 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 4 Feb 2024 22:36:25 +0800 Subject: [PATCH] fix: more big batch writes than page writes --- source/libs/executor/src/tsort.c | 35 ++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1935bb68bb..25c69c35c9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -48,7 +48,10 @@ typedef struct SSortMemPageEntry { typedef struct SSortMemFile { int32_t pageSize; int32_t cacheSize; - char* pageBuf; + + char* writePageBuf; + int32_t startPageId; + int32_t numWritePages; int32_t currPageId; int32_t currPageOffset; @@ -123,7 +126,7 @@ struct SSortHandle { static int32_t destroySortMemFile(SSortHandle* pHandle); static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage); static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId); -static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle); +static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle) { pHandle->singleTableMerge = true; @@ -1084,7 +1087,8 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { pMemFile->pageSize = pHandle->extRowsPageSize; pMemFile->cacheSize = pHandle->extRowsMemSize; - pMemFile->pageBuf = taosMemoryMalloc(pMemFile->pageSize); + pMemFile->numWritePages = pMemFile->cacheSize/pMemFile->pageSize; + pMemFile->writePageBuf = taosMemoryMalloc(pMemFile->pageSize * pMemFile->numWritePages); pMemFile->bDirty = false; pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); @@ -1112,7 +1116,7 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { taosMemoryFree(pCurr); } tSimpleHashCleanup(pMemFile->mActivePages); - taosMemoryFree(pMemFile->pageBuf); + taosMemoryFree(pMemFile->writePageBuf); fclose(pMemFile->pTdFile); taosRemoveFile(pMemFile->memFilePath); taosMemoryFree(pMemFile); @@ -1125,32 +1129,41 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p if (pMemFile->currPageId == -1) { pMemFile->currPageId = 0; pMemFile->currPageOffset = 0; + pMemFile->startPageId = 0; } else { if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) { - fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET); - fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); ++pMemFile->currPageId; pMemFile->currPageOffset = 0; + + if (pMemFile->currPageId - pMemFile->startPageId >= pMemFile->numWritePages) { + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); + fwrite(pMemFile->writePageBuf, pMemFile->pageSize * pMemFile->numWritePages, 1, pMemFile->pTdFile); + + pMemFile->startPageId = pMemFile->currPageId; + } } } *pPageId = pMemFile->currPageId; *pOffset = pMemFile->currPageOffset; - int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->pageBuf + pMemFile->currPageOffset); + int32_t offsetPages = (pMemFile->currPageId - pMemFile->startPageId) * pMemFile->pageSize; + int32_t blockLen = blockRowToBuf(pBlock, rowIdx, + pMemFile->writePageBuf + offsetPages + pMemFile->currPageOffset); *pLength = blockLen; pMemFile->currPageOffset += blockLen; pMemFile->bDirty = true; return TSDB_CODE_SUCCESS; } -static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) { +static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; if (!pMemFile->bDirty) { return TSDB_CODE_SUCCESS; } - fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET); - fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); + int32_t numWriteBytes = pMemFile->pageSize * (pMemFile->currPageId - pMemFile->startPageId) + pMemFile->currPageOffset + 1; + fwrite(pMemFile->writePageBuf, numWriteBytes, 1, pMemFile->pTdFile); pMemFile->bDirty = false; return TSDB_CODE_SUCCESS; } @@ -1411,7 +1424,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); } - saveLastPageToExtRowsMemFile(pHandle); + saveDirtyPagesToExtRowsMemFile(pHandle); SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);