refactor
This commit is contained in:
parent
fd06497e1a
commit
2569e9d375
|
@ -52,7 +52,7 @@ void tfsDirName(TFSFILE *pfile, char dest[]);
|
|||
void tfsBaseName(TFSFILE *pfile, char dest[]);
|
||||
int tfsopen(TFSFILE *pfile, int flags);
|
||||
int tfsclose(int fd);
|
||||
int tfsRemoveFiles(int nfile, ...);
|
||||
int tfsremove(TFSFILE *pfile);
|
||||
SDiskID tfsFileID(TFSFILE *pfile);
|
||||
|
||||
typedef struct TFSDIR TFSDIR;
|
||||
|
|
|
@ -150,6 +150,18 @@ int tfsclose(int fd) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tfsremove(TFSFILE *pfile) {
|
||||
int code = remove(pfile->aname);
|
||||
if (code != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
tfsLock();
|
||||
tfsDecFileAt(pfile->level, pfile->id);
|
||||
tfsUnLock();
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tfsRemoveFiles(int nfile, ...) {
|
||||
va_list valist;
|
||||
TFSFILE *pfile = NULL;
|
||||
|
|
|
@ -215,7 +215,7 @@ typedef struct {
|
|||
int index;
|
||||
} SFileGroupIter;
|
||||
|
||||
#define TSDB_FILE_NAME(pFile) (tfsAbsName(pFile->file))
|
||||
#define TSDB_FILE_NAME(pFile) ((pFile)->file.aname)
|
||||
|
||||
// ------------------ tsdbMain.c
|
||||
typedef struct {
|
||||
|
@ -529,7 +529,6 @@ int tsdbLoadFileHeader(SFile* pFile, uint32_t* version);
|
|||
void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size);
|
||||
void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup);
|
||||
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||
int tsdbGetBaseDirFromFile(char* fname, char* baseDir);
|
||||
int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
|
||||
|
||||
// ------------------ tsdbRWHelper.c
|
||||
|
|
|
@ -270,11 +270,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
|
||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||
|
||||
(void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
|
||||
(void)rename(TSDB_FILE_NAME(helperNewHeadF(pHelper)), TSDB_FILE_NAME(helperHeadF(pHelper)));
|
||||
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
|
||||
|
||||
if (newLast) {
|
||||
(void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
|
||||
(void)rename(TSDB_FILE_NAME(helperNewLastF(pHelper)), TSDB_FILE_NAME(helperLastF(pHelper)));
|
||||
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
|
||||
} else {
|
||||
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
|
||||
|
|
|
@ -130,9 +130,10 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { // TODO
|
|||
for (int i = 0; i < pFileH->nFGroups; i++) {
|
||||
SFileGroup *pFGroup = pFileH->pFGroup + i;
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
tsdbDestroyFile(&pFGroup->files[type]);
|
||||
tsdbCloseFile(&(pFGroup->files[type]));
|
||||
}
|
||||
}
|
||||
// TODO: delete each files
|
||||
}
|
||||
|
||||
// SFileGroup ===========================================
|
||||
|
@ -141,34 +142,35 @@ SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) {
|
|||
char fname[TSDB_FILENAME_LEN] = "\0";
|
||||
SFileGroup fg = {0};
|
||||
SFileGroup *pfg = &fg;
|
||||
SFile * pfile = NULL;
|
||||
int id = -1;
|
||||
SFile * pFile = NULL;
|
||||
int id = TFS_UNDECIDED_ID;
|
||||
|
||||
ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL && pFileH->nFGroups < pFileH->maxFGroups);
|
||||
|
||||
// 1. Create each files
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
pfile = &(pfg->files[type]);
|
||||
pFile = &(pfg->files[type]);
|
||||
|
||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname);
|
||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->file.rname);
|
||||
pFile->file.level = level;
|
||||
pFile->file.id = id;
|
||||
|
||||
pfile->file = tfsCreateFiles(level, id, fname);
|
||||
if (pfile->file == NULL) {
|
||||
// TODO :deal with error
|
||||
if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0); {
|
||||
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsdbOpenFile(pfile, O_WRONLY) < 0); {
|
||||
// TODO: deal with the ERROR here
|
||||
if (tsdbUpdateFileHeader(pFile) < 0) {
|
||||
tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), TSDB_FILE_NAME(pFile),
|
||||
tstrerror(terrno));
|
||||
tsdbCloseFile(pFile);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsdbUpdateFileHeader(pfile) < 0) {
|
||||
// TODO: deal the error
|
||||
}
|
||||
tsdbCloseFile(pFile);
|
||||
|
||||
tsdbCloseFile(pfile);
|
||||
|
||||
level = TFS_FILE_LEVEL(pfile->file);
|
||||
id = TFS_FILE_ID(pfile->file);
|
||||
level = pFile->file.level;
|
||||
id = pFile->file.id;
|
||||
}
|
||||
|
||||
// Set fg
|
||||
|
@ -200,8 +202,10 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
|||
pFileH->nFGroups--;
|
||||
ASSERT(pFileH->nFGroups >= 0);
|
||||
|
||||
tfsRemoveFiles(TSDB_FILE_TYPE_MAX, &fileGroup.files[TSDB_FILE_TYPE_HEAD], &fileGroup.files[TSDB_FILE_TYPE_DATA],
|
||||
&fileGroup.files[TSDB_FILE_TYPE_LAST]);
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
SFile *pFile = &(pFGroup->files[type]);
|
||||
tfsremove(&(pFile->file));
|
||||
}
|
||||
}
|
||||
|
||||
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
|
||||
|
@ -307,10 +311,9 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
|
|||
int tsdbOpenFile(SFile *pFile, int oflag) {
|
||||
ASSERT(!TSDB_IS_FILE_OPENED(pFile));
|
||||
|
||||
pFile->fd = open(TSDB_FILE_NAME(pFile), oflag, 0755);
|
||||
pFile->fd = tfsopen(&(pFile->file), oflag);
|
||||
if (pFile->fd < 0) {
|
||||
tsdbError("failed to open file %s since %s", TSDB_FILE_NAME(pFile), strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tsdbError("failed to open file %s since %s", TSDB_FILE_NAME(pFile), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -327,38 +330,6 @@ void tsdbCloseFile(SFile *pFile) {
|
|||
}
|
||||
}
|
||||
|
||||
static int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
|
||||
memset((void *)pFile, 0, sizeof(SFile));
|
||||
pFile->fd = -1;
|
||||
|
||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, TSDB_FILE_NAME(pFile));
|
||||
|
||||
if (access(TSDB_FILE_NAME(pFile), F_OK) == 0) {
|
||||
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), TSDB_FILE_NAME(pFile));
|
||||
terrno = TSDB_CODE_TDB_FILE_ALREADY_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tsdbOpenFile(pFile, O_RDWR | O_CREAT) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
||||
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
|
||||
if (tsdbUpdateFileHeader(pFile) < 0) {
|
||||
tsdbCloseFile(pFile);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbCloseFile(pFile);
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
return -1;
|
||||
}
|
||||
|
||||
int tsdbUpdateFileHeader(SFile *pFile) {
|
||||
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||
|
||||
|
@ -437,7 +408,7 @@ int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
|
||||
void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) { // TODO
|
||||
uint32_t version = 0;
|
||||
SFile file;
|
||||
SFile * pFile = &file;
|
||||
|
@ -463,8 +434,6 @@ _err:
|
|||
*size = 0;
|
||||
}
|
||||
|
||||
static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); }
|
||||
|
||||
// Retention ===========================================
|
||||
void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
@ -490,22 +459,6 @@ void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) {
|
|||
TSDB_KEY_FILEID(now - pCfg->keep1 * tsMsPerDay[pCfg->precision], pCfg->daysPerFile, pCfg->precision);
|
||||
}
|
||||
|
||||
int tsdbGetBaseDirFromFile(char *fname, char *baseDir) {
|
||||
char *fdup = strdup(fname);
|
||||
if (fdup == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < 5; i++) {
|
||||
dirname(fdup);
|
||||
}
|
||||
|
||||
strncpy(baseDir, fdup, TSDB_FILENAME_LEN);
|
||||
free(fdup);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = NULL;
|
||||
|
|
|
@ -115,9 +115,14 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
|||
pHelper->files.fGroup = *pGroup;
|
||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
|
||||
helperNewHeadF(pHelper)->fname);
|
||||
helperNewHeadF(pHelper)->file.rname);
|
||||
helperNewHeadF(pHelper)->file.level = pGroup->files[0].file.level;
|
||||
helperNewHeadF(pHelper)->file.id = pGroup->files[0].file.id;
|
||||
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
|
||||
helperNewLastF(pHelper)->fname);
|
||||
helperNewLastF(pHelper)->file.rname);
|
||||
helperNewLastF(pHelper)->file.level = pGroup->files[0].file.level;
|
||||
helperNewLastF(pHelper)->file.id = pGroup->files[0].file.id;
|
||||
}
|
||||
|
||||
// Open the files
|
||||
|
@ -194,7 +199,9 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError, SFileGroup *pGroup) {
|
|||
fsync(pFile->fd);
|
||||
}
|
||||
tsdbCloseFile(pFile);
|
||||
if (hasError) (void)remove(TSDB_FILE_NAME(pFile));
|
||||
if (hasError) {
|
||||
tfsremove(&(pFile->file));
|
||||
}
|
||||
}
|
||||
|
||||
pFile = helperNewLastF(pHelper);
|
||||
|
@ -204,7 +211,9 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError, SFileGroup *pGroup) {
|
|||
fsync(pFile->fd);
|
||||
}
|
||||
tsdbCloseFile(pFile);
|
||||
if (hasError) (void)remove(TSDB_FILE_NAME(pFile));
|
||||
if (hasError) {
|
||||
tfsremove(&(pFile->file));
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue