refact tfs module
This commit is contained in:
parent
cc5563716c
commit
b86bda4b41
|
@ -58,8 +58,8 @@ int64_t taosWriteFile(FileFd fd, const void *buf, int64_t count);
|
|||
|
||||
void taosCloseFile(FileFd fd);
|
||||
|
||||
int32_t taosRenameFile(char *oldName, char *newName);
|
||||
int64_t taosCopyFile(char *from, char *to);
|
||||
int32_t taosRenameFile(const char *oldName, const char *newName);
|
||||
int64_t taosCopyFile(const char *from, const char *to);
|
||||
|
||||
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
|
||||
|
||||
|
|
|
@ -54,9 +54,9 @@ typedef struct SVnodeCfg {
|
|||
|
||||
typedef struct {
|
||||
int32_t sver;
|
||||
char * timezone;
|
||||
char * locale;
|
||||
char * charset;
|
||||
const char *timezone;
|
||||
const char *locale;
|
||||
const char *charset;
|
||||
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
|
||||
PutReqToVQueryQFp putReqToVQueryQFp;
|
||||
} SVnodeOpt;
|
||||
|
|
|
@ -33,8 +33,11 @@
|
|||
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0)
|
||||
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
|
||||
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
|
||||
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
|
||||
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
|
||||
#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level)
|
||||
#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id)
|
||||
#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did)
|
||||
#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname)
|
||||
#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
||||
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf))
|
||||
#define TSDB_FILE_STATE(tf) ((tf)->state)
|
||||
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
|
||||
|
@ -185,7 +188,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t
|
|||
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
|
||||
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
||||
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
|
||||
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
|
||||
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader);
|
||||
int tsdbUpdateDFileHeader(SDFile* pDFile);
|
||||
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
|
||||
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
|
||||
|
@ -263,7 +266,7 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte
|
|||
return (int)nbyte;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_FILE_F(pDFile)); }
|
||||
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
|
||||
|
||||
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
@ -278,7 +281,7 @@ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nby
|
|||
}
|
||||
|
||||
static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
|
||||
if (tfscopy(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
|
||||
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
@ -318,7 +321,7 @@ void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet);
|
|||
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
|
||||
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
|
||||
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
|
||||
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
|
||||
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet* pSet, bool updateHeader);
|
||||
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
|
||||
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);
|
||||
|
||||
|
|
|
@ -484,7 +484,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
// Create a new FSET to write data
|
||||
tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)));
|
||||
|
||||
if (tsdbCreateDFileSet(pWSet, true) < 0) {
|
||||
if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) {
|
||||
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
|
||||
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
|
||||
if (pCommith->isRFileSet) {
|
||||
|
@ -508,7 +508,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
// TSDB_FILE_HEAD
|
||||
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
||||
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
|
||||
if (tsdbCreateDFile(pWHeadf, true) < 0) {
|
||||
if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) {
|
||||
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
|
||||
tstrerror(terrno));
|
||||
|
||||
|
@ -557,7 +557,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
|
||||
pCommith->isLFileSame = false;
|
||||
|
||||
if (tsdbCreateDFile(pWLastf, true) < 0) {
|
||||
if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) {
|
||||
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
|
||||
tstrerror(terrno));
|
||||
|
||||
|
|
|
@ -931,8 +931,8 @@ static int tsdbScanRootDir(STsdb *pRepo) {
|
|||
// continue;
|
||||
// }
|
||||
|
||||
(void)tfsremove(pf);
|
||||
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
|
||||
(void)tfsRemoveFile(pf);
|
||||
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
|
||||
}
|
||||
|
||||
tfsClosedir(tdir);
|
||||
|
@ -957,8 +957,8 @@ static int tsdbScanDataDir(STsdb *pRepo) {
|
|||
tfsBasename(pf, bname);
|
||||
|
||||
if (!tsdbIsTFileInFS(pfs, pf)) {
|
||||
(void)tfsremove(pf);
|
||||
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
|
||||
(void)tfsRemoveFile(pf);
|
||||
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1016,7 +1016,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
|
|||
|
||||
// if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) {
|
||||
// // Skip current.t file
|
||||
// tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
|
||||
// tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), pf->aname);
|
||||
// (void)tfsremove(pf);
|
||||
// continue;
|
||||
// }
|
||||
|
@ -1026,7 +1026,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
|
|||
// // Match
|
||||
// if (pfs->cstatus->pmf != NULL) {
|
||||
// tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo),
|
||||
// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf));
|
||||
// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), pf->aname);
|
||||
// terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
// tfsClosedir(tdir);
|
||||
// regfree(®ex);
|
||||
|
@ -1081,7 +1081,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
|
|||
// }
|
||||
// } else if (code == REG_NOMATCH) {
|
||||
// // Not match
|
||||
// tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
|
||||
// tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
|
||||
// tfsremove(pf);
|
||||
// continue;
|
||||
// } else {
|
||||
|
@ -1129,7 +1129,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdir = tfsOpendir(dataDir);
|
||||
tdir = tfsOpendir(pRepo->pTfs, dataDir);
|
||||
if (tdir == NULL) {
|
||||
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
|
||||
tstrerror(terrno));
|
||||
|
@ -1152,8 +1152,8 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
|
|||
}
|
||||
} else if (code == REG_NOMATCH) {
|
||||
// Not match
|
||||
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
|
||||
(void)tfsremove(pf);
|
||||
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
|
||||
(void)tfsRemoveFile(pf);
|
||||
continue;
|
||||
} else {
|
||||
// Has other error
|
||||
|
|
|
@ -352,15 +352,15 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
|
||||
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
|
||||
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
|
||||
|
||||
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
|
||||
if (pDFile->fd < 0) {
|
||||
if (errno == ENOENT) {
|
||||
// Try to create directory recursively
|
||||
char *s = strdup(TFILE_REL_NAME(&(pDFile->f)));
|
||||
if (tfsMkdirRecurAt(dirname(s), TSDB_FILE_LEVEL(pDFile), TSDB_FILE_ID(pDFile)) < 0) {
|
||||
char *s = strdup(TSDB_FILE_REL_NAME(pDFile));
|
||||
if (tfsMkdirRecurAt(pRepo->pTfs, dirname(s), TSDB_FILE_DID(pDFile)) < 0) {
|
||||
tfree(s);
|
||||
return -1;
|
||||
}
|
||||
|
@ -565,7 +565,7 @@ void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint3
|
|||
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
|
||||
tsdbInitDFile(pRepo->pTfs, pDFile, did, fid, ver, ftype);
|
||||
tsdbInitDFile(pRepo, pDFile, did, fid, ver, ftype);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -594,7 +594,7 @@ void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
|
|||
pSet->state = 0;
|
||||
pSet->fid = fid;
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
buf = tsdbDecodeSDFile(pRepo->pTfs, buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
@ -633,9 +633,9 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
|
||||
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
|
||||
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
|
||||
tsdbCloseDFileSet(pSet);
|
||||
tsdbRemoveDFileSet(pSet);
|
||||
return -1;
|
||||
|
|
|
@ -88,26 +88,14 @@ void tfsPosNextId(STfsTier *pTier);
|
|||
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
|
||||
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
|
||||
|
||||
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
|
||||
|
||||
#define tfsLock(pTfs) pthread_spin_lock(&(pTfs)->lock)
|
||||
#define tfsUnLock(pTfs) pthread_spin_unlock(&(pTfs)->lock)
|
||||
|
||||
#define TFS_TIER_AT(pTfs, level) (&pTfs->tiers[level])
|
||||
#define TFS_DISK_AT(pTfs, did) (pTfs->tiers[(did).level].disks[(did).id])
|
||||
#define TFS_PRIMARY_DISK(pTfs) (pTfs->tiers[0].disks[0])
|
||||
#define TFS_TIER_AT(pTfs, level) (&(pTfs)->tiers[level])
|
||||
#define TFS_DISK_AT(pTfs, did) ((pTfs)->tiers[(did).level].disks[(did).id])
|
||||
#define TFS_PRIMARY_DISK(pTfs) ((pTfs)->tiers[0].disks[0])
|
||||
|
||||
#define TFS_IS_VALID_LEVEL(level) (((level) >= 0) && ((level) < TFS_NLEVEL()))
|
||||
#define TFS_IS_VALID_ID(level, id) (((id) >= 0) && ((id) < TIER_NDISKS(TFS_TIER_AT(level))))
|
||||
#define TFS_IS_VALID_DISK(level, id) (TFS_IS_VALID_LEVEL(level) && TFS_IS_VALID_ID(level, id))
|
||||
|
||||
#define TIER_LEVEL(pt) ((pt)->level)
|
||||
#define TIER_NDISKS(pt) ((pt)->ndisk)
|
||||
#define TIER_SIZE(pt) ((pt)->tmeta.size)
|
||||
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
|
||||
|
||||
#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
|
||||
#define DISK_DIR(pd) ((pd)->path)
|
||||
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -198,6 +198,14 @@ void tfsDirname(const STfsFile *pFile, char *dest) {
|
|||
tstrncpy(dest, dirname(tname), TSDB_FILENAME_LEN);
|
||||
}
|
||||
|
||||
int32_t tfsRemoveFile(const STfsFile *pFile) {
|
||||
return remove(pFile->aname);
|
||||
}
|
||||
|
||||
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
|
||||
return taosCopyFile(pFile1->aname, pFile2->aname);
|
||||
}
|
||||
|
||||
int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId) {
|
||||
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
|
||||
char aname[TMPNAME_LEN];
|
||||
|
@ -281,8 +289,8 @@ int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) {
|
|||
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
|
||||
for (int32_t id = 0; id < pTier->ndisk; id++) {
|
||||
STfsDisk *pDisk = pTier->disks[id];
|
||||
snprintf(oaname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TD_DIRSEP, orname);
|
||||
snprintf(naname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TD_DIRSEP, nrname);
|
||||
snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname);
|
||||
snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname);
|
||||
if (taosRenameFile(oaname, naname) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -142,7 +142,7 @@ int64_t taosWriteFile(FileFd fd, const void *buf, int64_t n) {
|
|||
|
||||
int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence) { return (int64_t)lseek(fd, (long)offset, whence); }
|
||||
|
||||
int64_t taosCopyFile(char *from, char *to) {
|
||||
int64_t taosCopyFile(const char *from, const char *to) {
|
||||
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
|
||||
return 0;
|
||||
#else
|
||||
|
@ -400,7 +400,7 @@ int32_t taosFsyncFile(FileFd fd) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int32_t taosRenameFile(char *oldName, char *newName) {
|
||||
int32_t taosRenameFile(const char *oldName, const char *newName) {
|
||||
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
|
||||
int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED);
|
||||
if (code < 0) {
|
||||
|
|
Loading…
Reference in New Issue