From ffc8c8d1485ba3de170e22b82375767955e21f36 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 7 Mar 2024 09:52:56 +0800 Subject: [PATCH] feat: use tdfile api --- source/libs/executor/src/tsort.c | 58 ++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 822c997cf0..e8eb511ebe 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -67,8 +67,7 @@ typedef struct SSortMemFile { int32_t cacheSize; int32_t blockSize; - FILE* pTdFile; - // TdFilePtr pTdFile; + TdFilePtr pTdFile; char memFilePath[PATH_MAX]; } SSortMemFile; @@ -1027,9 +1026,17 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i if (pRegion->buf == NULL) { pRegion->bufRegOffset = 0; pRegion->buf = taosMemoryMalloc(pMemFile->blockSize); - tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); + if (pRegion->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int64_t ret = taosLSeekFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); - fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile); + if (ret >= 0) { + ret = taosReadFile(pMemFile->pTdFile, pRegion->buf, readBytes); + } + if (ret != readBytes) { + return TAOS_SYSTEM_ERROR(errno); + } pRegion->bufLen = readBytes; } // TODO: ASSERT(pRegion->offset < tupleOffset); @@ -1037,17 +1044,25 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i *pFreeRow = false; *ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset; } else { - *ppRow = taosMemoryMalloc(rowLen); - int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset); - memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, - szThisBlock); - tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); - int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); - fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile); - memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock); - *pFreeRow = true; - pRegion->bufRegOffset += pRegion->bufLen; - pRegion->bufLen = readBytes; + *ppRow = taosMemoryMalloc(rowLen); + if (*ppRow == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset); + memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, + szThisBlock); + int64_t ret = taosLSeekFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); + int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); + if (ret >= 0) { + ret = taosReadFile(pMemFile->pTdFile, pRegion->buf, readBytes); + } + if (ret != readBytes) { + return TAOS_SYSTEM_ERROR(errno); + } + memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock); + *pFreeRow = true; + pRegion->bufRegOffset += pRegion->bufLen; + pRegion->bufLen = readBytes; } //TODO: free region memory return TSDB_CODE_SUCCESS; @@ -1060,7 +1075,7 @@ static int32_t createSortMemFile(SSortHandle* pHandle) { SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile)); pMemFile->cacheSize = pHandle->extRowsMemSize; taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath); - pMemFile->pTdFile = fopen(pMemFile->memFilePath, "w+"); + pMemFile->pTdFile = taosOpenFile(pMemFile->memFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);; if (pMemFile->pTdFile == NULL) { taosMemoryFree(pMemFile); return TAOS_SYSTEM_ERROR(errno); @@ -1093,8 +1108,7 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) { taosMemoryFree(pMemFile->writeBuf); pMemFile->writeBuf = NULL; - fclose(pMemFile->pTdFile); - pMemFile->pTdFile = NULL; + taosCloseFile(&pMemFile->pTdFile); taosRemoveFile(pMemFile->memFilePath); taosMemoryFree(pMemFile); pHandle->pExtRowsMemFile = NULL; @@ -1130,8 +1144,8 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) { pRegion->regionSize = pMemFile->currRegionOffset; int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); if (writeBytes > 0) { - int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); - if (ret != 1) { + int64_t ret = taosWriteFile(pMemFile->pTdFile, pMemFile->writeBuf, writeBytes); + if (ret != writeBytes) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; } @@ -1163,8 +1177,8 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p { if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); - int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); - if (ret != 1) { + int64_t ret = taosWriteFile(pMemFile->pTdFile, pMemFile->writeBuf, writeBytes); + if (ret != writeBytes) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; }