From 539736cfc935c9bfbdf573ccfba01d1f9510c005 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 4 Feb 2024 16:30:34 +0800 Subject: [PATCH] fix: first version of mem file= --- source/libs/executor/src/tsort.c | 238 +++++++++++++++++++------------ 1 file changed, 150 insertions(+), 88 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 2a2d8cf2fc..c01612675a 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -46,18 +46,20 @@ typedef struct SSortMemPageEntry { typedef struct SSortMemFile { int32_t pageSize; int32_t cacheSize; - char* writeBuf; + char* pageBuf; int32_t currPageId; int32_t currPageOffset; bool bDirty; int32_t totalMemPages; - SSortMemPageEntry* memPages; + SSortMemPageEntry* pagesHead; + SSortMemPageEntry* pagesTail; int32_t numMemPages; SSHashObj* mActivePages; - TdFilePtr pBufFile; + TdFilePtr pTdFile; + char memFilePath[PATH_MAX]; } SSortMemFile; struct SSortHandle { @@ -105,7 +107,6 @@ struct SSortHandle { void* abortCheckParam; bool bSortByRowId; - SDiskbasedBuf* pExtRowsBuf; SSortMemFile* pExtRowsMemFile; int32_t extRowBytes; int32_t extRowsPageSize; @@ -117,6 +118,11 @@ struct SSortHandle { void* mergeLimitReachedParam; }; +static int32_t destroySortMemFile(SSortHandle* pHandle); +static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage); +static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId); +static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle); + void tsortSetSingleTableMerge(SSortHandle* pHandle) { pHandle->singleTableMerge = true; } @@ -342,8 +348,8 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr); taosArrayDestroy(pSortHandle->pOrderedSource); - if (pSortHandle->pExtRowsBuf != NULL) { - destroyDiskbasedBuf(pSortHandle->pExtRowsBuf); + if (pSortHandle->pExtRowsMemFile != NULL) { + destroySortMemFile(pSortHandle); } taosArrayDestroy(pSortHandle->pSortInfo); taosMemoryFreeClear(pSortHandle); @@ -896,7 +902,9 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa if (pHandle->bSortByRowId) { int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); - void* page = getBufPage(pHandle->pExtRowsBuf, pageId); + + char* page = NULL; + getPageFromExtMemFile(pHandle, pageId, &page); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); char* buf = (char*)page + offset; @@ -926,11 +934,13 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa (int32_t)(pStart - buf)); }; - releaseBufPage(pHandle->pExtRowsBuf, page); - pBlock->info.dataLoad = 1; pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; pBlock->info.rows += 1; + + if (offset + pHandle->extRowBytes >= pHandle->pExtRowsMemFile->pageSize) { + setExtMemFilePageUnused(pHandle->pExtRowsMemFile, pageId); + } } else { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); @@ -951,45 +961,6 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa } } -static int32_t getPageFromExtSrcRowsBuf(SDiskbasedBuf* pResultBuf, int32_t rowBytes, int32_t* pPageId, SFilePage** ppFilePage) { - SFilePage* pFilePage = NULL; - - int32_t pageId = -1; - SArray* list = getDataBufPagesIdList(pResultBuf); - - if (taosArrayGetSize(list) == 0) { - pFilePage = getNewBufPage(pResultBuf, &pageId); - pFilePage->num = sizeof(SFilePage); - } else { - SPageInfo* pi = getLastPageInfo(list); - pFilePage = getBufPage(pResultBuf, getPageId(pi)); - if (pFilePage == NULL) { - qError("failed to get buffer, code:%s", tstrerror(terrno)); - return terrno; - } - - pageId = getPageId(pi); - - if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) { - releaseBufPageInfo(pResultBuf, pi); - - pFilePage = getNewBufPage(pResultBuf, &pageId); - if (pFilePage != NULL) { - pFilePage->num = sizeof(SFilePage); - } - } - } - - if (pFilePage == NULL) { - qError("failed to get buffer, code:%s", tstrerror(terrno)); - return terrno; - } - - *pPageId = pageId; - *ppFilePage = pFilePage; - return TSDB_CODE_SUCCESS; -} - static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -1034,67 +1005,156 @@ static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { return (int32_t)(pStart - (char*)buf); } -static int32_t saveBlockRowToExtRowsBuf(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { - SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf; - int32_t rowBytes = pHandle->extRowBytes; - int32_t pageId = -1; - SFilePage* pFilePage = NULL; - int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - *pPageId = pageId; - *pOffset = pFilePage->num; - *pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset)); - - pFilePage->num += (*pLength); - setBufPageDirty(pFilePage, true); - releaseBufPage(pResultBuf, pFilePage); - return 0; -} - // pageId * pageSize == pageStartOffset in file. write in pages // when pass the page boundaries, the page is move to the front(old). // find hash from pageid to page entry. if the page can not be found, -// 1) nuse inactive pages, 2) then new pages if not exceeding mem limit, 3) then active pages +// 1) unused inactive pages, 2) then new pages if not exceeding mem limit, 3) then active pages // new pages is added or moved to the back. +static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; + SSortMemPageEntry** ppPageEntry = tSimpleHashGet(pMemFile->mActivePages, &pageId, sizeof(pageId)); + if (ppPageEntry) { + *ppPage = (char*)((*ppPageEntry)->data); + } else { + SSortMemPageEntry* pEntry = pMemFile->pagesHead->next; + if (pEntry && !pEntry->active || pMemFile->numMemPages >= pMemFile->totalMemPages) { + if (pEntry->active) { + tSimpleHashRemove(pMemFile->mActivePages, &pEntry->pageId, sizeof(pEntry->pageId)); + } + pEntry->prev->next = pEntry->next; + pEntry->next->prev = pEntry->prev; + taosLSeekFile(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET); + taosReadFile(pMemFile->pTdFile, pEntry->data, pMemFile->pageSize); + pEntry->active = false; + } else if (pMemFile->numMemPages < pMemFile->totalMemPages) { + pEntry = taosMemoryCalloc(1, sizeof(SSortMemPageEntry)); + pEntry->data = taosMemoryMalloc(pMemFile->pageSize); + ++pMemFile->numMemPages; + } + { + SSortMemPageEntry* tail = pMemFile->pagesTail; + tail->next = pEntry; + pEntry->next = NULL; + pEntry->prev = tail; + pEntry->active = true; + pMemFile->pagesTail = pEntry; + tSimpleHashPut(pMemFile->mActivePages, &pageId, sizeof(pageId), &pEntry, POINTER_BYTES); + *ppPage = pEntry->data; + } + } + return TSDB_CODE_SUCCESS; +} + +static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId) { + SSortMemPageEntry** ppPageEntry = tSimpleHashGet(pMemFile->mActivePages, &pageId, sizeof(pageId)); + SSortMemPageEntry* pEntry = *ppPageEntry; + if (pEntry == pMemFile->pagesTail) { + pMemFile->pagesTail = pEntry->prev; + } + + pEntry->prev->next = pEntry->next; + pEntry->next->prev = pEntry->prev; + + SSortMemPageEntry* first = pMemFile->pagesHead->next; + SSortMemPageEntry* head = pMemFile->pagesHead; + head->next = pEntry; + pEntry->next = first; + first->prev = pEntry; + pEntry->prev = head; + + pEntry->active = false; + tSimpleHashRemove(pMemFile->mActivePages, &pageId, sizeof(pageId)); + return; +} + static int32_t createSortMemFile(SSortHandle* pHandle) { if (pHandle->pExtRowsMemFile != NULL) { return TSDB_CODE_SUCCESS; } SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile)); - pMemFile->pBufFile = - taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + + taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath); + pMemFile->pTdFile = + taosOpenFile(pMemFile->memFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + pMemFile->currPageId = -1; + pMemFile->currPageOffset = -1; + + pMemFile->pageSize = pHandle->extRowsPageSize; + pMemFile->cacheSize = pHandle->extRowsMemSize; + pMemFile->pageBuf = taosMemoryMalloc(pMemFile->pageSize); + pMemFile->bDirty = false; + + pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + pMemFile->pagesHead = taosMemoryCalloc(1, sizeof(SSortMemPageEntry)); + pMemFile->pagesTail = pMemFile->pagesHead; + + pMemFile->totalMemPages = pMemFile->cacheSize / pMemFile->pageSize; + pMemFile->numMemPages = 0; + + pHandle->pExtRowsMemFile = pMemFile; return TSDB_CODE_SUCCESS; } +static int32_t destroySortMemFile(SSortHandle* pHandle) { + if (pHandle->pExtRowsMemFile == NULL) return TSDB_CODE_SUCCESS; + + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; + SSortMemPageEntry* pEntry = pMemFile->pagesHead; + while (pEntry != NULL) { + if (pEntry->data) { + taosMemoryFree(pEntry->data); + } + SSortMemPageEntry* pCurr = pEntry; + pEntry = pEntry->next; + taosMemoryFree(pCurr); + } + tSimpleHashCleanup(pMemFile->mActivePages); + taosMemoryFree(pMemFile->pageBuf); + taosCloseFile(pMemFile->pTdFile); + return TSDB_CODE_SUCCESS; +} + static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { - SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf; - int32_t rowBytes = pHandle->extRowBytes; - int32_t pageId = -1; - SFilePage* pFilePage = NULL; - int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage); - if (code != TSDB_CODE_SUCCESS) { - return code; + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; + if (pMemFile->currPageId == -1) { + pMemFile->currPageId = 0; + pMemFile->currPageOffset = 0; + } else { + if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) { + taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); + taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1); + + ++pMemFile->currPageId; + pMemFile->currPageOffset = 0; + } } - *pPageId = pageId; - *pOffset = pFilePage->num; - *pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset)); + *pPageId = pMemFile->currPageId; + *pOffset = pMemFile->currPageOffset; + int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->pageBuf + pMemFile->currPageOffset); + *pLength = blockLen; + pMemFile->currPageOffset += blockLen; + pMemFile->bDirty = true; + return TSDB_CODE_SUCCESS; +} - pFilePage->num += (*pLength); - setBufPageDirty(pFilePage, true); - releaseBufPage(pResultBuf, pFilePage); - return 0; +static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) { + SSortMemFile* pMemFile = pHandle->pExtRowsMemFile; + if (!pMemFile->bDirty) { + return TSDB_CODE_SUCCESS; + } + taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET); + taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1); + pMemFile->bDirty = false; + return TSDB_CODE_SUCCESS; } static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) { int32_t pageId = -1; int32_t offset = -1; int32_t length = -1; - saveBlockRowToExtRowsBuf(pHandle, pSource, *rowIndex, &pageId, &offset, &length); + saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length); SSDataBlock* pBlock = pHandle->pDataBlock; SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId); @@ -1145,14 +1205,13 @@ static void initRowIdSort(SSortHandle* pHandle) { } int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsMemSize) { - int32_t code = createDiskbasedBuf(&pHandle->pExtRowsBuf, extRowsPageSize, extRowsMemSize, "sort-ext-rows", tsTempDir); - dBufSetPrintInfo(pHandle->pExtRowsBuf); pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t); pHandle->extRowsPageSize = extRowsPageSize; pHandle->extRowsMemSize = extRowsMemSize; SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); pHandle->extRowsOrderInfo = *pOrder; initRowIdSort(pHandle); + int32_t code = createSortMemFile(pHandle); pHandle->bSortByRowId = true; return code; } @@ -1346,6 +1405,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* } blockDataCleanup(pHandle->pDataBlock); } + + saveLastPageToExtRowsMemFile(pHandle); + SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);