Merge pull request #28943 from taosdata/enh/3.0/TD-32870

enh(wal):close wal module open file
This commit is contained in:
Shengliang Guan 2024-11-28 15:36:13 +08:00 committed by GitHub
commit c30d48a8a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 58 deletions

View File

@ -371,7 +371,8 @@ static int32_t walLogEntriesComplete(const SWal* pWal) {
} }
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t code = 0; int32_t code = TSDB_CODE_SUCCESS;
TdFilePtr pFile = NULL;
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
if (!pFileInfo) { if (!pFileInfo) {
TAOS_RETURN(TSDB_CODE_FAILED); TAOS_RETURN(TSDB_CODE_FAILED);
@ -384,7 +385,7 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) { if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
code = terrno; code = terrno;
TAOS_RETURN(code); goto _exit;
} }
int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1); int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
int64_t lastEndOffset = records * sizeof(SWalIdxEntry); int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
@ -393,9 +394,10 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE); pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) { if (pFile == NULL) {
TAOS_RETURN(terrno); code = terrno;
goto _exit;
} }
wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize, wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
@ -404,11 +406,12 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
code = taosFtruncateFile(pFile, lastEndOffset); code = taosFtruncateFile(pFile, lastEndOffset);
if (code < 0) { if (code < 0) {
wError("vgId:%d, failed to truncate file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); wError("vgId:%d, failed to truncate file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
TAOS_RETURN(code); goto _exit;
} }
(void)taosCloseFile(&pFile);
TAOS_RETURN(TSDB_CODE_SUCCESS); _exit:
(void)taosCloseFile(&pFile);
TAOS_RETURN(code);
} }
static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) { static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) {

View File

@ -160,12 +160,13 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) {
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
int32_t code = 0;
int64_t ret; int64_t ret;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
TdFilePtr pIdxFile = NULL, pLogFile = NULL;
if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = TSDB_CODE_WAL_INVALID_VER;
goto _exit;
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
} }
// find correct file // find correct file
@ -173,9 +174,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// change current files // change current files
ret = walChangeWrite(pWal, ver); ret = walChangeWrite(pWal, ver);
if (ret < 0) { if (ret < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = terrno;
goto _exit;
TAOS_RETURN(terrno);
} }
// delete files in descending order // delete files in descending order
@ -197,98 +197,81 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile)); pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxFile == NULL) { if (pIdxFile == NULL) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = terrno;
goto _exit;
TAOS_RETURN(terrno);
} }
int64_t idxOff = walGetVerIdxOffset(pWal, ver); int64_t idxOff = walGetVerIdxOffset(pWal, ver);
ret = taosLSeekFile(pIdxFile, idxOff, SEEK_SET); ret = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (ret < 0) { if (ret < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = terrno;
goto _exit;
TAOS_RETURN(terrno);
} }
// read idx file and get log file pos // read idx file and get log file pos
SWalIdxEntry entry; SWalIdxEntry entry;
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = terrno;
goto _exit;
TAOS_RETURN(terrno);
} }
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TAOS_UNUSED(taosCloseFile(&pWal->pLogFile)); pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr); wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) { if (pLogFile == NULL) {
// TODO // TODO
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = terrno;
goto _exit;
TAOS_RETURN(terrno);
} }
ret = taosLSeekFile(pLogFile, entry.offset, SEEK_SET); ret = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
if (ret < 0) { if (ret < 0) {
// TODO // TODO
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = terrno;
goto _exit;
TAOS_RETURN(terrno);
} }
// validate offset // validate offset
SWalCkHead head; SWalCkHead head;
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) { if (size != sizeof(SWalCkHead)) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = terrno;
goto _exit;
TAOS_RETURN(terrno);
} }
int32_t code = walValidHeadCksum(&head); code = walValidHeadCksum(&head);
if (code != 0) { if (code != 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = TSDB_CODE_WAL_FILE_CORRUPTED;
goto _exit;
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
} }
if (head.head.version != ver) { if (head.head.version != ver) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); code = TSDB_CODE_WAL_FILE_CORRUPTED;
goto _exit;
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
} }
// truncate old files // truncate old files
code = taosFtruncateFile(pLogFile, entry.offset); code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) { if (code < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); goto _exit;
TAOS_RETURN(code);
} }
code = taosFtruncateFile(pIdxFile, idxOff); code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) { if (code < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); goto _exit;
TAOS_RETURN(code);
} }
pWal->vers.lastVer = ver - 1; pWal->vers.lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
TAOS_UNUSED(taosCloseFile(&pIdxFile));
TAOS_UNUSED(taosCloseFile(&pLogFile));
code = walSaveMeta(pWal); code = walSaveMeta(pWal);
if (code < 0) { if (code < 0) {
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr()); wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr());
goto _exit;
}
_exit:
TAOS_UNUSED(taosCloseFile(&pIdxFile));
TAOS_UNUSED(taosCloseFile(&pLogFile));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code); TAOS_RETURN(code);
}
// unlock
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
static int32_t walRollImpl(SWal *pWal) { static int32_t walRollImpl(SWal *pWal) {
@ -718,6 +701,7 @@ static int32_t walInitWriteFile(SWal *pWal) {
walBuildLogName(pWal, fileFirstVer, fnameStr); walBuildLogName(pWal, fileFirstVer, fnameStr);
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pLogTFile == NULL) { if (pLogTFile == NULL) {
TAOS_UNUSED(taosCloseFile(&pIdxTFile));
TAOS_RETURN(terrno); TAOS_RETURN(terrno);
} }
// switch file // switch file