diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 4e5cc37d99..2a2d8cf2fc 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -32,6 +32,34 @@ struct STupleHandle { int32_t rowIndex; }; +typedef struct SSortMemPageEntry { + int32_t pageId; + bool active; + + void* data; + + struct SSortMemPageEntry* next; + struct SSortMemPageEntry* prev; + +} SSortMemPageEntry; + +typedef struct SSortMemFile { + int32_t pageSize; + int32_t cacheSize; + char* writeBuf; + + int32_t currPageId; + int32_t currPageOffset; + bool bDirty; + + int32_t totalMemPages; + SSortMemPageEntry* memPages; + int32_t numMemPages; + SSHashObj* mActivePages; + + TdFilePtr pBufFile; +} SSortMemFile; + struct SSortHandle { int32_t type; int32_t pageSize; @@ -78,6 +106,7 @@ struct SSortHandle { bool bSortByRowId; SDiskbasedBuf* pExtRowsBuf; + SSortMemFile* pExtRowsMemFile; int32_t extRowBytes; int32_t extRowsPageSize; int32_t extRowsMemSize; @@ -1025,6 +1054,41 @@ static int32_t saveBlockRowToExtRowsBuf(SSortHandle* pHandle, SSDataBlock* pBloc return 0; } +// pageId * pageSize == pageStartOffset in file. write in pages +// when pass the page boundaries, the page is move to the front(old). +// find hash from pageid to page entry. if the page can not be found, +// 1) nuse inactive pages, 2) then new pages if not exceeding mem limit, 3) then active pages +// new pages is added or moved to the back. + +static int32_t createSortMemFile(SSortHandle* pHandle) { + if (pHandle->pExtRowsMemFile != NULL) { + return TSDB_CODE_SUCCESS; + } + SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile)); + pMemFile->pBufFile = + taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + return TSDB_CODE_SUCCESS; +} + +static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { + SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf; + int32_t rowBytes = pHandle->extRowBytes; + int32_t pageId = -1; + SFilePage* pFilePage = NULL; + int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + *pPageId = pageId; + *pOffset = pFilePage->num; + *pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset)); + + pFilePage->num += (*pLength); + setBufPageDirty(pFilePage, true); + releaseBufPage(pResultBuf, pFilePage); + return 0; +} static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) { int32_t pageId = -1;