From fa55a32e7761a6c277c68dea7e17797ee2bfbf8c Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 4 Feb 2024 16:53:47 +0800 Subject: [PATCH 1/6] fix: pass compilation --- source/libs/executor/src/tsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c01612675a..e633af12ed 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1111,7 +1111,7 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { } tSimpleHashCleanup(pMemFile->mActivePages); taosMemoryFree(pMemFile->pageBuf); - taosCloseFile(pMemFile->pTdFile); + taosCloseFile(&pMemFile->pTdFile); return TSDB_CODE_SUCCESS; } From 8e0f578dba5984db83c3f7b4155303bc54cdbc4b Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 4 Feb 2024 16:57:48 +0800 Subject: [PATCH 2/6] fix: memory leak --- source/libs/executor/src/tsort.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index e633af12ed..22c3f14b43 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1112,6 +1112,8 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { tSimpleHashCleanup(pMemFile->mActivePages); taosMemoryFree(pMemFile->pageBuf); taosCloseFile(&pMemFile->pTdFile); + taosMemoryFree(pMemFile); + pHandle->pExtRowsMemFile = NULL; return TSDB_CODE_SUCCESS; } From 23bd2aa525801e2343ce89d36ae470787f249953 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 4 Feb 2024 17:51:29 +0800 Subject: [PATCH 3/6] fix: first pass --- source/libs/executor/src/tsort.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 22c3f14b43..3f30e3fe89 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1024,8 +1024,6 @@ static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char* } pEntry->prev->next = pEntry->next; pEntry->next->prev = pEntry->prev; - taosLSeekFile(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET); - taosReadFile(pMemFile->pTdFile, pEntry->data, pMemFile->pageSize); pEntry->active = false; } else if (pMemFile->numMemPages < pMemFile->totalMemPages) { pEntry = taosMemoryCalloc(1, sizeof(SSortMemPageEntry)); @@ -1033,6 +1031,8 @@ static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char* ++pMemFile->numMemPages; } { + taosLSeekFile(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET); + taosReadFile(pMemFile->pTdFile, pEntry->data, pMemFile->pageSize); SSortMemPageEntry* tail = pMemFile->pagesTail; tail->next = pEntry; pEntry->next = NULL; From 18366934c30c5b46651d26ebd97e33da7133597b Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 4 Feb 2024 20:16:19 +0800 Subject: [PATCH 4/6] fix: use c api --- source/libs/executor/src/tsort.c | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3f30e3fe89..022a420c06 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#define ALLOW_FORBID_FUNC + #include "query.h" #include "tcommon.h" @@ -58,7 +60,7 @@ typedef struct SSortMemFile { int32_t numMemPages; SSHashObj* mActivePages; - TdFilePtr pTdFile; + FILE* pTdFile; char memFilePath[PATH_MAX]; } SSortMemFile; @@ -1031,8 +1033,8 @@ static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char* ++pMemFile->numMemPages; } { - taosLSeekFile(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET); - taosReadFile(pMemFile->pTdFile, pEntry->data, pMemFile->pageSize); + fseek(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET); + fread(pEntry->data, pMemFile->pageSize, 1, pMemFile->pTdFile); SSortMemPageEntry* tail = pMemFile->pagesTail; tail->next = pEntry; pEntry->next = NULL; @@ -1076,7 +1078,7 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath); pMemFile->pTdFile = - taosOpenFile(pMemFile->memFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + fopen(pMemFile->memFilePath, "wb+"); pMemFile->currPageId = -1; pMemFile->currPageOffset = -1; @@ -1111,7 +1113,8 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { } tSimpleHashCleanup(pMemFile->mActivePages); taosMemoryFree(pMemFile->pageBuf); - taosCloseFile(&pMemFile->pTdFile); + fclose(pMemFile->pTdFile); + taosRemoveFile(pMemFile->memFilePath); taosMemoryFree(pMemFile); pHandle->pExtRowsMemFile = NULL; return TSDB_CODE_SUCCESS; @@ -1124,8 +1127,8 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p pMemFile->currPageOffset = 0; } else { if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) { - taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); - taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1); + fseek(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); + fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); ++pMemFile->currPageId; pMemFile->currPageOffset = 0; @@ -1146,8 +1149,8 @@ static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) { if (!pMemFile->bDirty) { return TSDB_CODE_SUCCESS; } - taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); - taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1); + fseek(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); + fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); pMemFile->bDirty = false; return TSDB_CODE_SUCCESS; } From 247bfae03997d052d694d63de7e3a9a0df7b7649 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 4 Feb 2024 21:12:19 +0800 Subject: [PATCH 5/6] fix: use fseeko --- source/libs/executor/src/tsort.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 022a420c06..1935bb68bb 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1127,7 +1127,7 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p pMemFile->currPageOffset = 0; } else { if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) { - fseek(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET); fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); ++pMemFile->currPageId; @@ -1149,7 +1149,7 @@ static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) { if (!pMemFile->bDirty) { return TSDB_CODE_SUCCESS; } - fseek(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET); fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); pMemFile->bDirty = false; return TSDB_CODE_SUCCESS; From 83e44f07355e09a17ed5dfa1d68d6483bfcefda8 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 4 Feb 2024 22:36:25 +0800 Subject: [PATCH 6/6] fix: more big batch writes than page writes --- source/libs/executor/src/tsort.c | 35 ++++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1935bb68bb..25c69c35c9 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -48,7 +48,10 @@ typedef struct SSortMemPageEntry { typedef struct SSortMemFile { int32_t pageSize; int32_t cacheSize; - char* pageBuf; + + char* writePageBuf; + int32_t startPageId; + int32_t numWritePages; int32_t currPageId; int32_t currPageOffset; @@ -123,7 +126,7 @@ struct SSortHandle { static int32_t destroySortMemFile(SSortHandle* pHandle); static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage); static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId); -static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle); +static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle) { pHandle->singleTableMerge = true; @@ -1084,7 +1087,8 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { pMemFile->pageSize = pHandle->extRowsPageSize; pMemFile->cacheSize = pHandle->extRowsMemSize; - pMemFile->pageBuf = taosMemoryMalloc(pMemFile->pageSize); + pMemFile->numWritePages = pMemFile->cacheSize/pMemFile->pageSize; + pMemFile->writePageBuf = taosMemoryMalloc(pMemFile->pageSize * pMemFile->numWritePages); pMemFile->bDirty = false; pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); @@ -1112,7 +1116,7 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { taosMemoryFree(pCurr); } tSimpleHashCleanup(pMemFile->mActivePages); - taosMemoryFree(pMemFile->pageBuf); + taosMemoryFree(pMemFile->writePageBuf); fclose(pMemFile->pTdFile); taosRemoveFile(pMemFile->memFilePath); taosMemoryFree(pMemFile); @@ -1125,32 +1129,41 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p if (pMemFile->currPageId == -1) { pMemFile->currPageId = 0; pMemFile->currPageOffset = 0; + pMemFile->startPageId = 0; } else { if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) { - fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET); - fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); ++pMemFile->currPageId; pMemFile->currPageOffset = 0; + + if (pMemFile->currPageId - pMemFile->startPageId >= pMemFile->numWritePages) { + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); + fwrite(pMemFile->writePageBuf, pMemFile->pageSize * pMemFile->numWritePages, 1, pMemFile->pTdFile); + + pMemFile->startPageId = pMemFile->currPageId; + } } } *pPageId = pMemFile->currPageId; *pOffset = pMemFile->currPageOffset; - int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->pageBuf + pMemFile->currPageOffset); + int32_t offsetPages = (pMemFile->currPageId - pMemFile->startPageId) * pMemFile->pageSize; + int32_t blockLen = blockRowToBuf(pBlock, rowIdx, + pMemFile->writePageBuf + offsetPages + pMemFile->currPageOffset); *pLength = blockLen; pMemFile->currPageOffset += blockLen; pMemFile->bDirty = true; return TSDB_CODE_SUCCESS; } -static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) { +static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; if (!pMemFile->bDirty) { return TSDB_CODE_SUCCESS; } - fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET); - fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile); + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); + int32_t numWriteBytes = pMemFile->pageSize * (pMemFile->currPageId - pMemFile->startPageId) + pMemFile->currPageOffset + 1; + fwrite(pMemFile->writePageBuf, numWriteBytes, 1, pMemFile->pTdFile); pMemFile->bDirty = false; return TSDB_CODE_SUCCESS; } @@ -1411,7 +1424,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); } - saveLastPageToExtRowsMemFile(pHandle); + saveDirtyPagesToExtRowsMemFile(pHandle); SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);