fix: add error processing

This commit is contained in:
slzhou 2024-03-07 14:26:46 +08:00
parent 53aa45e225
commit f690e5cfb9
1 changed files with 73 additions and 22 deletions

View File

@ -241,11 +241,19 @@ void destroyTuple(void* t) {
} }
int tsortSeekFile(FILE* file, int64_t offset, int whence) { int tsortSeekFile(FILE* file, int64_t offset, int whence) {
#ifdef WINDOWS #ifdef WINDOWS
return _fseeki64(file, offset, whence); return _fseeki64(file, offset, whence);
#else #else
return fseeko(file, offset, whence); return fseeko(file, offset, whence);
#endif #endif
}
int tsortSetAutoDelFile(char* path) {
#ifdef WINDOWS
return SetFileAttributes(path, FILE_ATTRIBUTE_TEMPORARY);
#else
return unlink(path);
#endif
} }
/** /**
@ -1027,9 +1035,16 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
if (pRegion->buf == NULL) { if (pRegion->buf == NULL) {
pRegion->bufRegOffset = 0; pRegion->bufRegOffset = 0;
pRegion->buf = taosMemoryMalloc(pMemFile->blockSize); pRegion->buf = taosMemoryMalloc(pMemFile->blockSize);
if (pRegion->buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile); int ret = fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
pRegion->bufLen = readBytes; pRegion->bufLen = readBytes;
} }
// TODO: ASSERT(pRegion->offset < tupleOffset); // TODO: ASSERT(pRegion->offset < tupleOffset);
@ -1038,12 +1053,20 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
*ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset; *ppRow = pRegion->buf + tupleOffset - pRegion->bufRegOffset;
} else { } else {
*ppRow = taosMemoryMalloc(rowLen); *ppRow = taosMemoryMalloc(rowLen);
if (*ppRow == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset); int32_t szThisBlock = pRegion->bufLen - (tupleOffset - pRegion->bufRegOffset);
memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset,
szThisBlock); szThisBlock);
tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); tsortSeekFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile); int ret = fread(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
taosMemoryFreeClear(*ppRow);
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock); memcpy(*ppRow + szThisBlock, pRegion->buf, rowLen - szThisBlock);
*pFreeRow = true; *pFreeRow = true;
pRegion->bufRegOffset += pRegion->bufLen; pRegion->bufRegOffset += pRegion->bufLen;
@ -1057,27 +1080,55 @@ static int32_t createSortMemFile(SSortHandle* pHandle) {
if (pHandle->pExtRowsMemFile != NULL) { if (pHandle->pExtRowsMemFile != NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = TSDB_CODE_SUCCESS;
SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile)); SSortMemFile* pMemFile = taosMemoryCalloc(1, sizeof(SSortMemFile));
pMemFile->cacheSize = pHandle->extRowsMemSize; if (pMemFile == NULL) {
taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath); code = TSDB_CODE_OUT_OF_MEMORY;
pMemFile->pTdFile = fopen(pMemFile->memFilePath, "w+");
if (pMemFile->pTdFile == NULL) {
taosMemoryFree(pMemFile);
return TAOS_SYSTEM_ERROR(errno);
} }
pMemFile->currRegionId = -1; if (code == TSDB_CODE_SUCCESS) {
pMemFile->currRegionOffset = -1; taosGetTmpfilePath(tsTempDir, "sort-ext-mem", pMemFile->memFilePath);
pMemFile->pTdFile = fopen(pMemFile->memFilePath, "w+");
if (pMemFile->pTdFile == NULL) {
code = terrno = TAOS_SYSTEM_ERROR(errno);
}
}
if (code == TSDB_CODE_SUCCESS) {
tsortSetAutoDelFile(pMemFile->memFilePath);
pMemFile->writeBufSize = 4 * 1024 * 1024; pMemFile->currRegionId = -1;
pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize); pMemFile->currRegionOffset = -1;
pMemFile->writeFileOffset = -1;
pMemFile->bRegionDirty = false;
pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion)); pMemFile->writeBufSize = 4 * 1024 * 1024;
pMemFile->writeFileOffset = -1;
pHandle->pExtRowsMemFile = pMemFile; pMemFile->bRegionDirty = false;
return TSDB_CODE_SUCCESS;
} pMemFile->writeBuf = taosMemoryMalloc(pMemFile->writeBufSize);
if (pMemFile->writeBuf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (code == TSDB_CODE_SUCCESS) {
pMemFile->cacheSize = pHandle->extRowsMemSize;
pMemFile->aFileRegions = taosArrayInit(64, sizeof(SSortMemFileRegion));
if (pMemFile->aFileRegions == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (code == TSDB_CODE_SUCCESS) {
pHandle->pExtRowsMemFile = pMemFile;
} else {
if (pMemFile) {
if (pMemFile->aFileRegions) taosMemoryFreeClear(pMemFile->aFileRegions);
if (pMemFile->writeBuf) taosMemoryFreeClear(pMemFile->writeBuf);
if (pMemFile->pTdFile) {
fclose(pMemFile->pTdFile);
pMemFile->pTdFile = NULL;
}
taosMemoryFreeClear(pMemFile);
}
}
return code;
}
static int32_t destroySortMemFile(SSortHandle* pHandle) { static int32_t destroySortMemFile(SSortHandle* pHandle) {
if (pHandle->pExtRowsMemFile == NULL) return TSDB_CODE_SUCCESS; if (pHandle->pExtRowsMemFile == NULL) return TSDB_CODE_SUCCESS;