remove invalid files from system when start
This commit is contained in:
parent
5037b49593
commit
6628a47ef1
|
@ -67,7 +67,7 @@ typedef struct {
|
||||||
#define tfsrename(sf, df) rename(TFILE_NAME(sf), TFILE_NAME(df))
|
#define tfsrename(sf, df) rename(TFILE_NAME(sf), TFILE_NAME(df))
|
||||||
|
|
||||||
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
||||||
bool tfsIsSameFile(TFILE *pf1, TFILE *pf2);
|
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
|
||||||
int tfsEncodeFile(void **buf, TFILE *pf);
|
int tfsEncodeFile(void **buf, TFILE *pf);
|
||||||
void *tfsDecodeFile(void *buf, TFILE *pf);
|
void *tfsDecodeFile(void *buf, TFILE *pf);
|
||||||
void tfsbasename(const TFILE *pf, char *dest);
|
void tfsbasename(const TFILE *pf, char *dest);
|
||||||
|
|
|
@ -189,7 +189,9 @@ void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
|
||||||
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), bname);
|
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), bname);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tfsIsSameFile(TFILE *pf1, TFILE *pf2) {
|
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) {
|
||||||
|
ASSERT(pf1 != NULL || pf2 != NULL);
|
||||||
|
if (pf1 == NULL || pf2 == NULL) return false;
|
||||||
if (pf1->level != pf2->level) return false;
|
if (pf1->level != pf2->level) return false;
|
||||||
if (pf1->id != pf2->id) return false;
|
if (pf1->id != pf2->id) return false;
|
||||||
if (strncmp(pf1->rname, pf2->rname, TSDB_FILENAME_LEN) != 0) return false;
|
if (strncmp(pf1->rname, pf2->rname, TSDB_FILENAME_LEN) != 0) return false;
|
||||||
|
@ -326,6 +328,9 @@ const TFILE *tfsReaddir(TDIR *tdir) {
|
||||||
struct dirent *dp = NULL;
|
struct dirent *dp = NULL;
|
||||||
dp = readdir(tdir->dir);
|
dp = readdir(tdir->dir);
|
||||||
if (dp != NULL) {
|
if (dp != NULL) {
|
||||||
|
// Skip . and ..
|
||||||
|
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
|
||||||
|
|
||||||
snprintf(bname, TSDB_FILENAME_LEN, "%s/%s", tdir->dirname, dp->d_name);
|
snprintf(bname, TSDB_FILENAME_LEN, "%s/%s", tdir->dirname, dp->d_name);
|
||||||
tfsInitFile(&(tdir->tfile), tdir->level, tdir->id, bname);
|
tfsInitFile(&(tdir->tfile), tdir->level, tdir->id, bname);
|
||||||
return &(tdir->tfile);
|
return &(tdir->tfile);
|
||||||
|
|
|
@ -95,6 +95,8 @@ int tsdbUnlockRepo(STsdbRepo* pRepo);
|
||||||
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
|
STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo);
|
||||||
int tsdbCheckCommit(STsdbRepo* pRepo);
|
int tsdbCheckCommit(STsdbRepo* pRepo);
|
||||||
int tsdbRestoreInfo(STsdbRepo* pRepo);
|
int tsdbRestoreInfo(STsdbRepo* pRepo);
|
||||||
|
void tsdbGetRootDir(int repoid, char dirName[]);
|
||||||
|
void tsdbGetDataDir(int repoid, char dirName[]);
|
||||||
|
|
||||||
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
|
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
|
||||||
ASSERT(pRepo != NULL);
|
ASSERT(pRepo != NULL);
|
||||||
|
|
|
@ -26,6 +26,9 @@ static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
|
||||||
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
|
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
|
||||||
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo);
|
static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo);
|
||||||
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo);
|
static int tsdbScanAndTryFixFS(STsdbRepo *pRepo);
|
||||||
|
static int tsdbScanRootDir(STsdbRepo *pRepo);
|
||||||
|
static int tsdbScanDataDir(STsdbRepo *pRepo);
|
||||||
|
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
|
||||||
|
|
||||||
// ================== CURRENT file header info
|
// ================== CURRENT file header info
|
||||||
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
|
static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) {
|
||||||
|
@ -683,9 +686,9 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove those unused files
|
// : remove those unused files
|
||||||
{}
|
tsdbScanRootDir(pRepo);
|
||||||
|
tsdbScanDataDir(pRepo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -699,8 +702,6 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
|
||||||
int64_t maxBufSize = 0;
|
int64_t maxBufSize = 0;
|
||||||
SMFInfo minfo;
|
SMFInfo minfo;
|
||||||
|
|
||||||
// TODO: clear meta at first
|
|
||||||
|
|
||||||
// No meta file, just return
|
// No meta file, just return
|
||||||
if (pfs->cstatus->pmf == NULL) return 0;
|
if (pfs->cstatus->pmf == NULL) return 0;
|
||||||
|
|
||||||
|
@ -818,3 +819,81 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
|
||||||
tfree(pBuf);
|
tfree(pBuf);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbScanRootDir(STsdbRepo *pRepo) {
|
||||||
|
char rootDir[TSDB_FILENAME_LEN];
|
||||||
|
char bname[TSDB_FILENAME_LEN];
|
||||||
|
STsdbFS * pfs = REPO_FS(pRepo);
|
||||||
|
const TFILE *pf;
|
||||||
|
|
||||||
|
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
|
||||||
|
TDIR *tdir = tfsOpendir(rootDir);
|
||||||
|
if (tdir == NULL) {
|
||||||
|
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while ((pf = tfsReaddir(tdir))) {
|
||||||
|
tfsbasename(pf, bname);
|
||||||
|
|
||||||
|
if (strcmp(bname, tsdbTxnFname[TSDB_TXN_CURR_FILE]) == 0 || strcmp(bname, "data") == 0) {
|
||||||
|
// Skip current file and data directory
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tfsIsSameFile(pf, &(pfs->cstatus->pmf->f))) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsremove(pf);
|
||||||
|
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsClosedir(tdir);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbScanDataDir(STsdbRepo *pRepo) {
|
||||||
|
char dataDir[TSDB_FILENAME_LEN];
|
||||||
|
char bname[TSDB_FILENAME_LEN];
|
||||||
|
STsdbFS * pfs = REPO_FS(pRepo);
|
||||||
|
const TFILE *pf;
|
||||||
|
|
||||||
|
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
|
||||||
|
TDIR *tdir = tfsOpendir(dataDir);
|
||||||
|
if (tdir == NULL) {
|
||||||
|
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while ((pf = tfsReaddir(tdir))) {
|
||||||
|
tfsbasename(pf, bname);
|
||||||
|
|
||||||
|
if (!tsdbIsTFileInFS(pfs, pf)) {
|
||||||
|
tfsremove(pf);
|
||||||
|
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsClosedir(tdir);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
|
||||||
|
SFSIter fsiter;
|
||||||
|
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
|
||||||
|
SDFileSet *pSet;
|
||||||
|
|
||||||
|
while ((pSet = tsdbFSIterNext(&fsiter))) {
|
||||||
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
|
||||||
|
if (tfsIsSameFile(pf, TSDB_FILE_F(pDFile))) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
|
@ -21,8 +21,6 @@
|
||||||
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
|
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
|
||||||
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
|
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
|
||||||
|
|
||||||
static void tsdbGetDataDir(int repoid, char dirName[]);
|
|
||||||
static void tsdbGetRootDir(int repoid, char dirName[]);
|
|
||||||
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
|
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
|
||||||
static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
|
static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
|
||||||
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
||||||
|
@ -334,11 +332,11 @@ uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbGetRootDir(int repoid, char dirName[]) {
|
void tsdbGetRootDir(int repoid, char dirName[]) {
|
||||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
|
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbGetDataDir(int repoid, char dirName[]) {
|
void tsdbGetDataDir(int repoid, char dirName[]) {
|
||||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
|
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue