diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index c2479835fc..aa068c5857 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -40,7 +40,8 @@ extern "C" { // TSDB STATE DEFINITION #define TSDB_STATE_OK 0x0 -#define TSDB_STATE_BAD_FILE 0x1 +#define TSDB_STATE_BAD_META 0x1 +#define TSDB_STATE_BAD_DATA 0x2 // --------- TSDB APPLICATION HANDLE DEFINITION typedef struct { diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index 48980fedfe..5010a340b0 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -224,6 +224,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { int32_t code = syncRestoreFile(pPeer, &fversion); if (code < 0) { + (*pNode->stopSyncFileFp)(pNode->vgId, fversion); sError("%s, failed to restore files", pPeer->id); return -1; } diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 9c79264af0..46810aa47d 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -63,7 +63,7 @@ int tsdbApplyMFileChange(SMFile* from, SMFile* to); int tsdbCreateMFile(SMFile* pMFile, bool updateHeader); int tsdbUpdateMFileHeader(SMFile* pMFile); int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); -int tsdbScanAndTryFixMFile(SMFile* pMFile); +int tsdbScanAndTryFixMFile(STsdbRepo* pRepo); int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); @@ -310,7 +310,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); -int tsdbScanAndTryFixDFileSet(SDFileSet* pSet); +int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet* pSet); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 1054a61123..548338e491 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -68,7 +68,7 @@ extern "C" { #include "tsdbCommitQueue.h" // Main definitions struct STsdbRepo { - int8_t state; + uint8_t state; STsdbCfg config; STsdbAppH appH; diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 6cfa8b4db6..0e93417806 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -255,7 +255,7 @@ int tsdbOpenFS(STsdbRepo *pRepo) { } // Load meta cache if has meta file - if (tsdbLoadMetaCache(pRepo, true) < 0) { + if ((!(pRepo->state & TSDB_STATE_BAD_META)) && tsdbLoadMetaCache(pRepo, true) < 0) { tsdbError("vgId:%d failed to open FS while loading meta cache since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -670,11 +670,9 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; - if (pStatus->pmf) { - if (tsdbScanAndTryFixMFile(pStatus->pmf) < 0) { - tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } + if (tsdbScanAndTryFixMFile(pRepo) < 0) { + tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; } size_t size = taosArrayGetSize(pStatus->df); @@ -682,13 +680,13 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { for (size_t i = 0; i < size; i++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pStatus->df, i); - if (tsdbScanAndTryFixDFileSet(pSet) < 0) { + if (tsdbScanAndTryFixDFileSet(pRepo, pSet) < 0) { tsdbError("vgId:%d failed to fix MFile since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } } - // : remove those unused files + // remove those unused files tsdbScanRootDir(pRepo); tsdbScanDataDir(pRepo); return 0; diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 493c71bdfd..c365d6c08e 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -163,9 +163,23 @@ int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) { return 0; } -int tsdbScanAndTryFixMFile(SMFile *pMFile) { +int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { + SMFile * pMFile = pRepo->fs->cstatus->pmf; struct stat mfstat; - SMFile mf = *pMFile; + SMFile mf; + + if (pMFile == NULL) { + return 0; + } + + mf = *pMFile; + + if (access(TSDB_FILE_FULL_NAME(pMFile), F_OK) != 0) { + tsdbError("vgId:%d meta file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), + TSDB_FILE_FULL_NAME(pMFile)); + pRepo->state |= TSDB_STATE_BAD_META; + return 0; + } if (stat(TSDB_FILE_FULL_NAME(&mf), &mfstat) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -189,9 +203,14 @@ int tsdbScanAndTryFixMFile(SMFile *pMFile) { } tsdbCloseMFile(&mf); + tsdbInfo("vgId:%d file %s is truncated from %" PRId64 " to %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), + mfstat.st_size, pMFile->info.size); } else if (pMFile->info.size < mfstat.st_size) { + tsdbError("vgId:%d meta file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", + REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), mfstat.st_size, pMFile->info.size); + pRepo->state |= TSDB_STATE_BAD_META; terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; + return 0; } return 0; @@ -361,10 +380,17 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { return 0; } -static int tsdbScanAndTryFixDFile(SDFile *pDFile) { +static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { struct stat dfstat; SDFile df = *pDFile; + if (access(TSDB_FILE_FULL_NAME(pDFile), F_OK) != 0) { + tsdbError("vgId:%d data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo), + TSDB_FILE_FULL_NAME(pDFile)); + pRepo->state |= TSDB_STATE_BAD_DATA; + return 0; + } + if (stat(TSDB_FILE_FULL_NAME(&df), &dfstat) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -387,9 +413,14 @@ static int tsdbScanAndTryFixDFile(SDFile *pDFile) { } tsdbCloseDFile(&df); + tsdbInfo("vgId:%d file %s is truncated from %" PRId64 " to %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), + dfstat.st_size, pDFile->info.size); } else if (pDFile->info.size < dfstat.st_size) { + tsdbError("vgId:%d data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it", + REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), dfstat.st_size, pDFile->info.size); + pRepo->state |= TSDB_STATE_BAD_DATA; terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; + return 0; } return 0; @@ -559,9 +590,9 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return 0; } -int tsdbScanAndTryFixDFileSet(SDFileSet *pSet) { +int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - if (tsdbScanAndTryFixDFile(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { + if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { return -1; } } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index b5c0cd18a1..248e384cae 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -98,7 +98,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { } // TODO: Restore information from data - if (tsdbRestoreInfo(pRepo) < 0) { + if ((!(pRepo->state & TSDB_STATE_BAD_DATA)) && tsdbRestoreInfo(pRepo) < 0) { tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno)); tsdbCloseRepo(pRepo, false); return NULL; diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index 21dc35d201..ae620d4cfd 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -82,6 +82,8 @@ int32_t tsdbSyncRecv(void *tsdb, int32_t socketFd) { STsdbRepo *pRepo = (STsdbRepo *)tsdb; SSyncH synch = {0}; + pRepo->state = TSDB_STATE_OK; + tsdbInitSyncH(&synch, pRepo, socketFd); tsdbStartFSTxn(pRepo, 0, 0); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7536ad0c0a..cd487105b8 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -242,7 +242,7 @@ int32_t vnodeOpen(int32_t vgId) { if (pVnode->tsdb == NULL) { vnodeCleanUp(pVnode); return terrno; - } else if (terrno != TSDB_CODE_SUCCESS) { + } else if (tsdbGetState(pVnode->tsdb) != TSDB_STATE_OK) { vError("vgId:%d, failed to open tsdb, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); if (pVnode->syncCfg.replica <= 1) {