feat: first commit to optimize out tpagedbuf

This commit is contained in:
slzhou 2024-01-31 13:10:42 +08:00
parent b27bf263af
commit 3facbd43db
1 changed files with 64 additions and 0 deletions

View File

@ -32,6 +32,34 @@ struct STupleHandle {
int32_t rowIndex;
};
typedef struct SSortMemPageEntry {
int32_t pageId;
bool active;
void* data;
struct SSortMemPageEntry* next;
struct SSortMemPageEntry* prev;
} SSortMemPageEntry;
typedef struct SSortMemFile {
int32_t pageSize;
int32_t cacheSize;
char* writeBuf;
int32_t currPageId;
int32_t currPageOffset;
bool bDirty;
int32_t totalMemPages;
SSortMemPageEntry* memPages;
int32_t numMemPages;
SSHashObj* mActivePages;
TdFilePtr pBufFile;
} SSortMemFile;
struct SSortHandle {
int32_t type;
int32_t pageSize;
@ -78,6 +106,7 @@ struct SSortHandle {
bool bSortByRowId;
SDiskbasedBuf* pExtRowsBuf;
SSortMemFile* pExtRowsMemFile;
int32_t extRowBytes;
int32_t extRowsPageSize;
int32_t extRowsMemSize;
@ -1025,6 +1054,41 @@ static int32_t saveBlockRowToExtRowsBuf(SSortHandle* pHandle, SSDataBlock* pBloc
return 0;
}
// pageId * pageSize == pageStartOffset in file. write in pages
// when pass the page boundaries, the page is move to the front(old).
// find hash from pageid to page entry. if the page can not be found,
// 1) nuse inactive pages, 2) then new pages if not exceeding mem limit, 3) then active pages
// new pages is added or moved to the back.
static int32_t createSortMemFile(SSortHandle* pHandle) {
if (pHandle->pExtRowsMemFile != NULL) {
return TSDB_CODE_SUCCESS;
}
SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
pMemFile->pBufFile =
taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
return TSDB_CODE_SUCCESS;
}
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) {
SDiskbasedBuf* pResultBuf = pHandle->pExtRowsBuf;
int32_t rowBytes = pHandle->extRowBytes;
int32_t pageId = -1;
SFilePage* pFilePage = NULL;
int32_t code = getPageFromExtSrcRowsBuf(pResultBuf, rowBytes, &pageId, &pFilePage);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
*pPageId = pageId;
*pOffset = pFilePage->num;
*pLength = blockRowToBuf(pBlock, rowIdx, (char*)pFilePage + (*pOffset));
pFilePage->num += (*pLength);
setBufPageDirty(pFilePage, true);
releaseBufPage(pResultBuf, pFilePage);
return 0;
}
static void appendToRowIndexDataBlock(SSortHandle* pHandle, SSDataBlock* pSource, int32_t* rowIndex) {
int32_t pageId = -1;