diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ebce48a068..363125c216 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#define ALLOW_FORBID_FUNC + #include "query.h" #include "tcommon.h" @@ -46,7 +48,10 @@ typedef struct SSortMemPageEntry { typedef struct SSortMemFile { int32_t pageSize; int32_t cacheSize; - char* pageBuf; + + char* writePageBuf; + int32_t startPageId; + int32_t numWritePages; int32_t currPageId; int32_t currPageOffset; @@ -58,7 +63,7 @@ typedef struct SSortMemFile { int32_t numMemPages; SSHashObj* mActivePages; - TdFilePtr pTdFile; + FILE* pTdFile; char memFilePath[PATH_MAX]; } SSortMemFile; @@ -121,7 +126,7 @@ struct SSortHandle { static int32_t destroySortMemFile(SSortHandle* pHandle); static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage); static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId); -static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle); +static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle) { pHandle->singleTableMerge = true; @@ -1024,8 +1029,6 @@ static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char* } pEntry->prev->next = pEntry->next; pEntry->next->prev = pEntry->prev; - taosLSeekFile(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET); - taosReadFile(pMemFile->pTdFile, pEntry->data, pMemFile->pageSize); pEntry->active = false; } else if (pMemFile->numMemPages < pMemFile->totalMemPages) { pEntry = taosMemoryCalloc(1, sizeof(SSortMemPageEntry)); @@ -1033,6 +1036,8 @@ static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char* ++pMemFile->numMemPages; } { + fseek(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET); + fread(pEntry->data, pMemFile->pageSize, 1, pMemFile->pTdFile); SSortMemPageEntry* tail = pMemFile->pagesTail; tail->next = pEntry; pEntry->next = NULL; @@ -1076,13 +1081,14 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath); pMemFile->pTdFile = - taosOpenFile(pMemFile->memFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + fopen(pMemFile->memFilePath, "wb+"); pMemFile->currPageId = -1; pMemFile->currPageOffset = -1; pMemFile->pageSize = pHandle->extRowsPageSize; pMemFile->cacheSize = pHandle->extRowsMemSize; - pMemFile->pageBuf = taosMemoryMalloc(pMemFile->pageSize); + pMemFile->numWritePages = pMemFile->cacheSize/pMemFile->pageSize; + pMemFile->writePageBuf = taosMemoryMalloc(pMemFile->pageSize * pMemFile->numWritePages); pMemFile->bDirty = false; pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); @@ -1110,8 +1116,11 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { taosMemoryFree(pCurr); } tSimpleHashCleanup(pMemFile->mActivePages); - taosMemoryFree(pMemFile->pageBuf); - taosCloseFile(pMemFile->pTdFile); + taosMemoryFree(pMemFile->writePageBuf); + fclose(pMemFile->pTdFile); + taosRemoveFile(pMemFile->memFilePath); + taosMemoryFree(pMemFile); + pHandle->pExtRowsMemFile = NULL; return TSDB_CODE_SUCCESS; } @@ -1120,32 +1129,41 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p if (pMemFile->currPageId == -1) { pMemFile->currPageId = 0; pMemFile->currPageOffset = 0; + pMemFile->startPageId = 0; } else { if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) { - taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); - taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1); ++pMemFile->currPageId; pMemFile->currPageOffset = 0; + + if (pMemFile->currPageId - pMemFile->startPageId >= pMemFile->numWritePages) { + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); + fwrite(pMemFile->writePageBuf, pMemFile->pageSize * pMemFile->numWritePages, 1, pMemFile->pTdFile); + + pMemFile->startPageId = pMemFile->currPageId; + } } } *pPageId = pMemFile->currPageId; *pOffset = pMemFile->currPageOffset; - int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->pageBuf + pMemFile->currPageOffset); + int32_t offsetPages = (pMemFile->currPageId - pMemFile->startPageId) * pMemFile->pageSize; + int32_t blockLen = blockRowToBuf(pBlock, rowIdx, + pMemFile->writePageBuf + offsetPages + pMemFile->currPageOffset); *pLength = blockLen; pMemFile->currPageOffset += blockLen; pMemFile->bDirty = true; return TSDB_CODE_SUCCESS; } -static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) { +static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle) { SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; if (!pMemFile->bDirty) { return TSDB_CODE_SUCCESS; } - taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); - taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1); + fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET); + int32_t numWriteBytes = pMemFile->pageSize * (pMemFile->currPageId - pMemFile->startPageId) + pMemFile->currPageOffset + 1; + fwrite(pMemFile->writePageBuf, numWriteBytes, 1, pMemFile->pTdFile); pMemFile->bDirty = false; return TSDB_CODE_SUCCESS; } @@ -1406,7 +1424,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* blockDataCleanup(pHandle->pDataBlock); } - saveLastPageToExtRowsMemFile(pHandle); + saveDirtyPagesToExtRowsMemFile(pHandle); SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);