fix: more big batch writes than page writes

This commit is contained in:
slzhou 2024-02-04 22:36:25 +08:00
parent 247bfae039
commit 83e44f0735
1 changed files with 24 additions and 11 deletions

View File

@ -48,7 +48,10 @@ typedef struct SSortMemPageEntry {
typedef struct SSortMemFile {
int32_t pageSize;
int32_t cacheSize;
char* pageBuf;
char* writePageBuf;
int32_t startPageId;
int32_t numWritePages;
int32_t currPageId;
int32_t currPageOffset;
@ -123,7 +126,7 @@ struct SSortHandle {
static int32_t destroySortMemFile(SSortHandle* pHandle);
static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage);
static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId);
static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle);
static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle);
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
pHandle->singleTableMerge = true;
@ -1084,7 +1087,8 @@ static int32_t createSortMemFile(SSortHandle* pHandle) {
pMemFile->pageSize = pHandle->extRowsPageSize;
pMemFile->cacheSize = pHandle->extRowsMemSize;
pMemFile->pageBuf = taosMemoryMalloc(pMemFile->pageSize);
pMemFile->numWritePages = pMemFile->cacheSize/pMemFile->pageSize;
pMemFile->writePageBuf = taosMemoryMalloc(pMemFile->pageSize * pMemFile->numWritePages);
pMemFile->bDirty = false;
pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
@ -1112,7 +1116,7 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) {
taosMemoryFree(pCurr);
}
tSimpleHashCleanup(pMemFile->mActivePages);
taosMemoryFree(pMemFile->pageBuf);
taosMemoryFree(pMemFile->writePageBuf);
fclose(pMemFile->pTdFile);
taosRemoveFile(pMemFile->memFilePath);
taosMemoryFree(pMemFile);
@ -1125,32 +1129,41 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p
if (pMemFile->currPageId == -1) {
pMemFile->currPageId = 0;
pMemFile->currPageOffset = 0;
pMemFile->startPageId = 0;
} else {
if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) {
fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET);
fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile);
++pMemFile->currPageId;
pMemFile->currPageOffset = 0;
if (pMemFile->currPageId - pMemFile->startPageId >= pMemFile->numWritePages) {
fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET);
fwrite(pMemFile->writePageBuf, pMemFile->pageSize * pMemFile->numWritePages, 1, pMemFile->pTdFile);
pMemFile->startPageId = pMemFile->currPageId;
}
}
}
*pPageId = pMemFile->currPageId;
*pOffset = pMemFile->currPageOffset;
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->pageBuf + pMemFile->currPageOffset);
int32_t offsetPages = (pMemFile->currPageId - pMemFile->startPageId) * pMemFile->pageSize;
int32_t blockLen = blockRowToBuf(pBlock, rowIdx,
pMemFile->writePageBuf + offsetPages + pMemFile->currPageOffset);
*pLength = blockLen;
pMemFile->currPageOffset += blockLen;
pMemFile->bDirty = true;
return TSDB_CODE_SUCCESS;
}
static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) {
static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
if (!pMemFile->bDirty) {
return TSDB_CODE_SUCCESS;
}
fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->currPageId) * pMemFile->pageSize, SEEK_SET);
fwrite(pMemFile->pageBuf, pMemFile->currPageOffset + 1, 1, pMemFile->pTdFile);
fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET);
int32_t numWriteBytes = pMemFile->pageSize * (pMemFile->currPageId - pMemFile->startPageId) + pMemFile->currPageOffset + 1;
fwrite(pMemFile->writePageBuf, numWriteBytes, 1, pMemFile->pTdFile);
pMemFile->bDirty = false;
return TSDB_CODE_SUCCESS;
}
@ -1411,7 +1424,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
blockDataCleanup(pHandle->pDataBlock);
}
saveLastPageToExtRowsMemFile(pHandle);
saveDirtyPagesToExtRowsMemFile(pHandle);
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);