Merge branch 'szhou/tms-wc/save-row-simplebuf' of github.com:taosdata/TDengine into szhou/tms-wc/save-row-simplebuf

This commit is contained in:
shenglian zhou 2024-02-20 11:27:46 +08:00
commit ee7edac825
1 changed files with 34 additions and 16 deletions

View File

@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#include "query.h"
#include "tcommon.h"
@ -46,7 +48,10 @@ typedef struct SSortMemPageEntry {
typedef struct SSortMemFile {
int32_t pageSize;
int32_t cacheSize;
char* pageBuf;
char* writePageBuf;
int32_t startPageId;
int32_t numWritePages;
int32_t currPageId;
int32_t currPageOffset;
@ -58,7 +63,7 @@ typedef struct SSortMemFile {
int32_t numMemPages;
SSHashObj* mActivePages;
TdFilePtr pTdFile;
FILE* pTdFile;
char memFilePath[PATH_MAX];
} SSortMemFile;
@ -121,7 +126,7 @@ struct SSortHandle {
static int32_t destroySortMemFile(SSortHandle* pHandle);
static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char** ppPage);
static void setExtMemFilePageUnused(SSortMemFile* pMemFile, int32_t pageId);
static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle);
static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle);
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
pHandle->singleTableMerge = true;
@ -1024,8 +1029,6 @@ static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char*
}
pEntry->prev->next = pEntry->next;
pEntry->next->prev = pEntry->prev;
taosLSeekFile(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET);
taosReadFile(pMemFile->pTdFile, pEntry->data, pMemFile->pageSize);
pEntry->active = false;
} else if (pMemFile->numMemPages < pMemFile->totalMemPages) {
pEntry = taosMemoryCalloc(1, sizeof(SSortMemPageEntry));
@ -1033,6 +1036,8 @@ static int32_t getPageFromExtMemFile(SSortHandle* pHandle, int32_t pageId, char*
++pMemFile->numMemPages;
}
{
fseek(pMemFile->pTdFile, pageId * pMemFile->pageSize, SEEK_SET);
fread(pEntry->data, pMemFile->pageSize, 1, pMemFile->pTdFile);
SSortMemPageEntry* tail = pMemFile->pagesTail;
tail->next = pEntry;
pEntry->next = NULL;
@ -1076,13 +1081,14 @@ static int32_t createSortMemFile(SSortHandle* pHandle) {
taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
pMemFile->pTdFile =
taosOpenFile(pMemFile->memFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
fopen(pMemFile->memFilePath, "wb+");
pMemFile->currPageId = -1;
pMemFile->currPageOffset = -1;
pMemFile->pageSize = pHandle->extRowsPageSize;
pMemFile->cacheSize = pHandle->extRowsMemSize;
pMemFile->pageBuf = taosMemoryMalloc(pMemFile->pageSize);
pMemFile->numWritePages = pMemFile->cacheSize/pMemFile->pageSize;
pMemFile->writePageBuf = taosMemoryMalloc(pMemFile->pageSize * pMemFile->numWritePages);
pMemFile->bDirty = false;
pMemFile->mActivePages = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
@ -1110,8 +1116,11 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) {
taosMemoryFree(pCurr);
}
tSimpleHashCleanup(pMemFile->mActivePages);
taosMemoryFree(pMemFile->pageBuf);
taosCloseFile(pMemFile->pTdFile);
taosMemoryFree(pMemFile->writePageBuf);
fclose(pMemFile->pTdFile);
taosRemoveFile(pMemFile->memFilePath);
taosMemoryFree(pMemFile);
pHandle->pExtRowsMemFile = NULL;
return TSDB_CODE_SUCCESS;
}
@ -1120,32 +1129,41 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p
if (pMemFile->currPageId == -1) {
pMemFile->currPageId = 0;
pMemFile->currPageOffset = 0;
pMemFile->startPageId = 0;
} else {
if (pMemFile->currPageOffset + pHandle->extRowBytes >= pMemFile->pageSize) {
taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET);
taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1);
++pMemFile->currPageId;
pMemFile->currPageOffset = 0;
if (pMemFile->currPageId - pMemFile->startPageId >= pMemFile->numWritePages) {
fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET);
fwrite(pMemFile->writePageBuf, pMemFile->pageSize * pMemFile->numWritePages, 1, pMemFile->pTdFile);
pMemFile->startPageId = pMemFile->currPageId;
}
}
}
*pPageId = pMemFile->currPageId;
*pOffset = pMemFile->currPageOffset;
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->pageBuf + pMemFile->currPageOffset);
int32_t offsetPages = (pMemFile->currPageId - pMemFile->startPageId) * pMemFile->pageSize;
int32_t blockLen = blockRowToBuf(pBlock, rowIdx,
pMemFile->writePageBuf + offsetPages + pMemFile->currPageOffset);
*pLength = blockLen;
pMemFile->currPageOffset += blockLen;
pMemFile->bDirty = true;
return TSDB_CODE_SUCCESS;
}
static int32_t saveLastPageToExtRowsMemFile(SSortHandle* pHandle) {
static int32_t saveDirtyPagesToExtRowsMemFile(SSortHandle* pHandle) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
if (!pMemFile->bDirty) {
return TSDB_CODE_SUCCESS;
}
taosLSeekFile(pMemFile->pTdFile, pMemFile->currPageId * pMemFile->pageSize, SEEK_SET);
taosWriteFile(pMemFile->pTdFile, pMemFile->pageBuf, pMemFile->currPageOffset + 1);
fseeko(pMemFile->pTdFile, ((int64_t)pMemFile->startPageId) * pMemFile->pageSize, SEEK_SET);
int32_t numWriteBytes = pMemFile->pageSize * (pMemFile->currPageId - pMemFile->startPageId) + pMemFile->currPageOffset + 1;
fwrite(pMemFile->writePageBuf, numWriteBytes, 1, pMemFile->pTdFile);
pMemFile->bDirty = false;
return TSDB_CODE_SUCCESS;
}
@ -1406,7 +1424,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
blockDataCleanup(pHandle->pDataBlock);
}
saveLastPageToExtRowsMemFile(pHandle);
saveDirtyPagesToExtRowsMemFile(pHandle);
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);