Revert "feat: use tdfile api"

This reverts commit ffc8c8d148.
This commit is contained in:
slzhou 2024-03-07 11:31:02 +08:00
parent ffc8c8d148
commit 53aa45e225
1 changed files with 22 additions and 36 deletions

View File

@ -67,7 +67,8 @@ typedef struct SSortMemFile {
int32_t cacheSize; int32_t cacheSize;
int32_t blockSize; int32_t blockSize;
TdFilePtr pTdFile; FILE* pTdFile;
// TdFilePtr pTdFile;
char memFilePath[PATH_MAX]; char memFilePath[PATH_MAX];
} SSortMemFile; } SSortMemFile;
@ -1026,17 +1027,9 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
if (pRegion->buf == NULL) { if (pRegion->buf == NULL) {
pRegion->bufRegOffset = 0; pRegion->bufRegOffset = 0;
pRegion->buf = taosMemoryMalloc(pMemFile->blockSize); pRegion->buf = taosMemoryMalloc(pMemFile->blockSize);
if (pRegion->buf == NULL) { tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
return TSDB_CODE_OUT_OF_MEMORY;
}
int64_t ret = taosLSeekFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
if (ret >= 0) { fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
ret = taosReadFile(pMemFile->pTdFile, pRegion->buf, readBytes);
}
if (ret != readBytes) {
return TAOS_SYSTEM_ERROR(errno);
}
pRegion->bufLen = readBytes; pRegion->bufLen = readBytes;
} }
// TODO: ASSERT(pRegion->offset < tupleOffset); // TODO: ASSERT(pRegion->offset < tupleOffset);
@ -1044,25 +1037,17 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
*pFreeRow = false; *pFreeRow = false;
*ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset; *ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
} else { } else {
*ppRow = taosMemoryMalloc(rowLen); *ppRow = taosMemoryMalloc(rowLen);
if (*ppRow == NULL) { int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset);
return TSDB_CODE_OUT_OF_MEMORY; memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset,
} szThisBlock);
int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset); tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
szThisBlock); fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
int64_t ret = taosLSeekFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); *pFreeRow = true;
if (ret >= 0) { pRegion->bufRegOffset += pRegion->bufLen;
ret = taosReadFile(pMemFile->pTdFile, pRegion->buf, readBytes); pRegion->bufLen = readBytes;
}
if (ret != readBytes) {
return TAOS_SYSTEM_ERROR(errno);
}
memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
*pFreeRow = true;
pRegion->bufRegOffset += pRegion->bufLen;
pRegion->bufLen = readBytes;
} }
//TODO: free region memory //TODO: free region memory
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1075,7 +1060,7 @@ static int32_t createSortMemFile(SSortHandle* pHandle) {
SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile)); SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
pMemFile->cacheSize = pHandle->extRowsMemSize; pMemFile->cacheSize = pHandle->extRowsMemSize;
taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath); taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
pMemFile->pTdFile = taosOpenFile(pMemFile->memFilePath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);; pMemFile->pTdFile = fopen(pMemFile->memFilePath, "w+");
if (pMemFile->pTdFile == NULL) { if (pMemFile->pTdFile == NULL) {
taosMemoryFree(pMemFile); taosMemoryFree(pMemFile);
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
@ -1108,7 +1093,8 @@ static int32_t destroySortMemFile(SSortHandle* pHandle) {
taosMemoryFree(pMemFile->writeBuf); taosMemoryFree(pMemFile->writeBuf);
pMemFile->writeBuf = NULL; pMemFile->writeBuf = NULL;
taosCloseFile(&pMemFile->pTdFile); fclose(pMemFile->pTdFile);
pMemFile->pTdFile = NULL;
taosRemoveFile(pMemFile->memFilePath); taosRemoveFile(pMemFile->memFilePath);
taosMemoryFree(pMemFile); taosMemoryFree(pMemFile);
pHandle->pExtRowsMemFile = NULL; pHandle->pExtRowsMemFile = NULL;
@ -1144,8 +1130,8 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) {
pRegion->regionSize = pMemFile->currRegionOffset; pRegion->regionSize = pMemFile->currRegionOffset;
int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
if (writeBytes > 0) { if (writeBytes > 0) {
int64_t ret = taosWriteFile(pMemFile->pTdFile, pMemFile->writeBuf, writeBytes); int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != writeBytes) { if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return terrno;
} }
@ -1177,8 +1163,8 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p
{ {
if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int64_t ret = taosWriteFile(pMemFile->pTdFile, pMemFile->writeBuf, writeBytes); int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != writeBytes) { if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return terrno;
} }