fix: first version of mem file=

This commit is contained in:
shenglian zhou 2024-02-04 16:30:34 +08:00
parent 3facbd43db
commit 539736cfc9
1 changed files with 150 additions and 88 deletions

View File

@ -46,18 +46,20 @@ typedef struct SSortMemPageEntry {
typedef struct SSortMemFile { typedef struct SSortMemFile {
int32_t pageSize; int32_t pageSize;
int32_t cacheSize; int32_t cacheSize;
char* writeBuf; char* pageBuf;
int32_t currPageId; int32_t currPageId;
int32_t currPageOffset; int32_t currPageOffset;
bool bDirty; bool bDirty;
int32_t totalMemPages; int32_t totalMemPages;
SSortMemPageEntry* memPages; SSortMemPageEntry* pagesHead;
SSortMemPageEntry* pagesTail;
int32_t numMemPages; int32_t numMemPages;
SSHashObj* mActivePages; SSHashObj* mActivePages;
TdFilePtr pBufFile; TdFilePtr pTdFile;
char memFilePath[PATH_MAX];
} SSortMemFile; } SSortMemFile;
struct SSortHandle { struct SSortHandle {
@ -105,7 +107,6 @@ struct SSortHandle {
void* abortCheckParam; void* abortCheckParam;
bool bSortByRowId; bool bSortByRowId;
SDiskbasedBuf* pExtRowsBuf;
SSortMemFile* pExtRowsMemFile; SSortMemFile* pExtRowsMemFile;
int32_t extRowBytes; int32_t extRowBytes;
int32_t extRowsPageSize; int32_t extRowsPageSize;
@ -117,6 +118,11 @@ struct SSortHandle {
void* mergeLimitReachedParam; void* mergeLimitReachedParam;
}; };
static int32_t destroySortMemFile(SSortHandle* pHandle);
static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage);
static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId);
static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle);
void tsortSetSingleTableMerge(SSortHandle* pHandle) { void tsortSetSingleTableMerge(SSortHandle* pHandle) {
pHandle->singleTableMerge = true; pHandle->singleTableMerge = true;
} }
@ -342,8 +348,8 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr); qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
taosArrayDestroy(pSortHandle->pOrderedSource); taosArrayDestroy(pSortHandle->pOrderedSource);
if (pSortHandle->pExtRowsBuf != NULL) { if (pSortHandle->pExtRowsMemFile != NULL) {
destroyDiskbasedBuf(pSortHandle->pExtRowsBuf); destroySortMemFile(pSortHandle);
} }
taosArrayDestroy(pSortHandle->pSortInfo); taosArrayDestroy(pSortHandle->pSortInfo);
taosMemoryFreeClear(pSortHandle); taosMemoryFreeClear(pSortHandle);
@ -896,7 +902,9 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa
if (pHandle->bSortByRowId) { if (pHandle->bSortByRowId) {
int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1); int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2); int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2);
void* page = getBufPage(pHandle->pExtRowsBuf, pageId);
char* page = NULL;
getPageFromExtMemFile(pHandle, pageId, &page);
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* buf = (char*)page + offset; char* buf = (char*)page + offset;
@ -926,11 +934,13 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa
(int32_t)(pStart - buf)); (int32_t)(pStart - buf));
}; };
releaseBufPage(pHandle->pExtRowsBuf, page);
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag; pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1; pBlock->info.rows += 1;
if (offset + pHandle->extRowBytes >= pHandle->pExtRowsMemFile->pageSize) {
setExtMemFilePageUnused(pHandle->pExtRowsMemFile, pageId);
}
} else { } else {
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
@ -951,45 +961,6 @@ void tsortAppendTupleToBlock(SSortHandle* pHandle, SSDataBlock* pBlock, STupleHa
} }
} }
static int32_t getPageFromExtSrcRowsBuf(SDiskbasedBuf* pResultBuf, int32_t rowBytes, int32_t* pPageId, SFilePage** ppFilePage) {
SFilePage* pFilePage = NULL;
int32_t pageId = -1;
SArray* list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pFilePage = getNewBufPage(pResultBuf, &pageId);
pFilePage->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pFilePage = getBufPage(pResultBuf, getPageId(pi));
if (pFilePage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi);
if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) {
releaseBufPageInfo(pResultBuf, pi);
pFilePage = getNewBufPage(pResultBuf, &pageId);
if (pFilePage != NULL) {
pFilePage->num = sizeof(SFilePage);
}
}
}
if (pFilePage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
*pPageId = pageId;
*ppFilePage = pFilePage;
return TSDB_CODE_SUCCESS;
}
static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) { static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -1034,67 +1005,156 @@ static int32_t blockRowToBuf(SSDataBlock* pBlock, int32_t rowIdx, char* buf) {
return (int32_t)(pStart - (char*)buf); return (int32_t)(pStart - (char*)buf);
} }
static int32_t saveBlockRowToExtRowsBuf(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) {
SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf;
int32_t rowBytes = pHandle->extRowBytes;
int32_t pageId = -1;
SFilePage* pFilePage = NULL;
int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
*pPageId = pageId;
*pOffset = pFilePage->num;
*pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset));
pFilePage->num += (*pLength);
setBufPageDirty(pFilePage, true);
releaseBufPage(pResultBuf, pFilePage);
return 0;
}
// pageId * pageSize == pageStartOffset in file. write in pages // pageId * pageSize == pageStartOffset in file. write in pages
// when pass the page boundaries, the page is move to the front(old). // when pass the page boundaries, the page is move to the front(old).
// find hash from pageid to page entry. if the page can not be found, // find hash from pageid to page entry. if the page can not be found,
// 1) nuse inactive pages, 2) then new pages if not exceeding mem limit, 3) then active pages // 1) unused inactive pages, 2) then new pages if not exceeding mem limit, 3) then active pages
// new pages is added or moved to the back. // new pages is added or moved to the back.
static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
SSortMemPageEntry** ppPageEntry = tSimpleHashGet(pMemFile->mActivePages, &pageId, sizeof(pageId));
if (ppPageEntry) {
*ppPage = (char*)((*ppPageEntry)->data);
} else {
SSortMemPageEntry* pEntry = pMemFile->pagesHead->next;
if (pEntry && !pEntry->active || pMemFile->numMemPages >= pMemFile->totalMemPages) {
if (pEntry->active) {
tSimpleHashRemove(pMemFile->mActivePages, &pEntry->pageId, sizeof(pEntry->pageId));
}
pEntry->prev->next = pEntry->next;
pEntry->next->prev = pEntry->prev;
taosLSeekFile(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET);
taosReadFile(pMemFile->pTdFile, pEntry->data, pMemFile->pageSize);
pEntry->active = false;
} else if (pMemFile->numMemPages < pMemFile->totalMemPages) {
pEntry = taosMemoryCalloc(1, sizeof(SSortMemPageEntry));
pEntry->data = taosMemoryMalloc(pMemFile->pageSize);
++pMemFile->numMemPages;
}
{
SSortMemPageEntry* tail = pMemFile->pagesTail;
tail->next = pEntry;
pEntry->next = NULL;
pEntry->prev = tail;
pEntry->active = true;
pMemFile->pagesTail = pEntry;
tSimpleHashPut(pMemFile->mActivePages, &pageId, sizeof(pageId), &pEntry, POINTER_BYTES);
*ppPage = pEntry->data;
}
}
return TSDB_CODE_SUCCESS;
}
static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId) {
SSortMemPageEntry** ppPageEntry = tSimpleHashGet(pMemFile->mActivePages, &pageId, sizeof(pageId));
SSortMemPageEntry* pEntry = *ppPageEntry;
if (pEntry == pMemFile->pagesTail) {
pMemFile->pagesTail = pEntry->prev;
}
pEntry->prev->next = pEntry->next;
pEntry->next->prev = pEntry->prev;
SSortMemPageEntry* first = pMemFile->pagesHead->next;
SSortMemPageEntry* head = pMemFile->pagesHead;
head->next = pEntry;
pEntry->next = first;
first->prev = pEntry;
pEntry->prev = head;
pEntry->active = false;
tSimpleHashRemove(pMemFile->mActivePages, &pageId, sizeof(pageId));
return;
}
static int32_t createSortMemFile(SSortHandle* pHandle) { static int32_t createSortMemFile(SSortHandle* pHandle) {
if (pHandle->pExtRowsMemFile != NULL) { if (pHandle->pExtRowsMemFile != NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile)); SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
pMemFile->pBufFile =
taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
pMemFile->pTdFile =
taosOpenFile(pMemFile->memFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
pMemFile->currPageId = -1;
pMemFile->currPageOffset = -1;
pMemFile->pageSize = pHandle->extRowsPageSize;
pMemFile->cacheSize = pHandle->extRowsMemSize;
pMemFile->pageBuf = taosMemoryMalloc(pMemFile->pageSize);
pMemFile->bDirty = false;
pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
pMemFile->pagesHead = taosMemoryCalloc(1, sizeof(SSortMemPageEntry));
pMemFile->pagesTail = pMemFile->pagesHead;
pMemFile->totalMemPages = pMemFile->cacheSize / pMemFile->pageSize;
pMemFile->numMemPages = 0;
pHandle->pExtRowsMemFile = pMemFile;
return TSDB_CODE_SUCCESS;
}
static int32_t destroySortMemFile(SSortHandle* pHandle) {
if (pHandle->pExtRowsMemFile == NULL) return TSDB_CODE_SUCCESS;
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
SSortMemPageEntry* pEntry = pMemFile->pagesHead;
while (pEntry != NULL) {
if (pEntry->data) {
taosMemoryFree(pEntry->data);
}
SSortMemPageEntry* pCurr = pEntry;
pEntry = pEntry->next;
taosMemoryFree(pCurr);
}
tSimpleHashCleanup(pMemFile->mActivePages);
taosMemoryFree(pMemFile->pageBuf);
taosCloseFile(pMemFile->pTdFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) { static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) {
SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf; SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
int32_t rowBytes = pHandle->extRowBytes; if (pMemFile->currPageId == -1) {
int32_t pageId = -1; pMemFile->currPageId = 0;
SFilePage* pFilePage = NULL; pMemFile->currPageOffset = 0;
int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage); } else {
if (code != TSDB_CODE_SUCCESS) { if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) {
return code; taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET);
taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1);
++pMemFile->currPageId;
pMemFile->currPageOffset = 0;
}
} }
*pPageId = pageId; *pPageId = pMemFile->currPageId;
*pOffset = pFilePage->num; *pOffset = pMemFile->currPageOffset;
*pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset)); int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->pageBuf + pMemFile->currPageOffset);
*pLength = blockLen;
pMemFile->currPageOffset += blockLen;
pMemFile->bDirty = true;
return TSDB_CODE_SUCCESS;
}
pFilePage->num += (*pLength); static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) {
setBufPageDirty(pFilePage, true); SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
releaseBufPage(pResultBuf, pFilePage); if (!pMemFile->bDirty) {
return 0; return TSDB_CODE_SUCCESS;
}
taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET);
taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1);
pMemFile->bDirty = false;
return TSDB_CODE_SUCCESS;
} }
static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) { static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
int32_t pageId = -1; int32_t pageId = -1;
int32_t offset = -1; int32_t offset = -1;
int32_t length = -1; int32_t length = -1;
saveBlockRowToExtRowsBuf(pHandle, pSource, *rowIndex, &pageId, &offset, &length); saveBlockRowToExtRowsMemFile(pHandle, pSource, *rowIndex, &pageId, &offset, &length);
SSDataBlock* pBlock = pHandle->pDataBlock; SSDataBlock* pBlock = pHandle->pDataBlock;
SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId); SColumnInfoData* pSrcTsCol = taosArrayGet(pSource->pDataBlock, pHandle->extRowsOrderInfo.slotId);
@ -1145,14 +1205,13 @@ static void initRowIdSort(SSortHandle* pHandle) {
} }
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsMemSize) { int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsPageSize, int32_t extRowsMemSize) {
int32_t code = createDiskbasedBuf(&pHandle->pExtRowsBuf, extRowsPageSize, extRowsMemSize, "sort-ext-rows", tsTempDir);
dBufSetPrintInfo(pHandle->pExtRowsBuf);
pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t); pHandle->extRowBytes = blockDataGetRowSize(pHandle->pDataBlock) + taosArrayGetSize(pHandle->pDataBlock->pDataBlock) + sizeof(int32_t);
pHandle->extRowsPageSize = extRowsPageSize; pHandle->extRowsPageSize = extRowsPageSize;
pHandle->extRowsMemSize = extRowsMemSize; pHandle->extRowsMemSize = extRowsMemSize;
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
pHandle->extRowsOrderInfo = *pOrder; pHandle->extRowsOrderInfo = *pOrder;
initRowIdSort(pHandle); initRowIdSort(pHandle);
int32_t code = createSortMemFile(pHandle);
pHandle->bSortByRowId = true; pHandle->bSortByRowId = true;
return code; return code;
} }
@ -1346,6 +1405,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
} }
blockDataCleanup(pHandle->pDataBlock); blockDataCleanup(pHandle->pDataBlock);
} }
saveLastPageToExtRowsMemFile(pHandle);
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false); SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId); doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);