refactor
This commit is contained in:
parent
c96dcb7ebd
commit
fd06497e1a
|
@ -396,6 +396,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, 0, 0x2202, "tfs too ma
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, 0, 0x2203, "tfs duplicate primary mount")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_PRIMARY_DISK, 0, 0x2204, "tfs no primary mount")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_MOUNT_AT_TIER, 0, 0x2205, "tfs no mount at tier")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, 0, 0x2206, "tfs file already exists")
|
||||
|
||||
|
||||
#ifdef TAOS_ERROR_C
|
||||
|
|
|
@ -27,6 +27,9 @@ typedef struct {
|
|||
int id;
|
||||
} SDiskID;
|
||||
|
||||
#define TFS_UNDECIDED_LEVEL -1
|
||||
#define TFS_UNDECIDED_ID -1
|
||||
|
||||
// tfs.c ====================================
|
||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
|
||||
void tfsDestroy();
|
||||
|
@ -36,7 +39,12 @@ const char *tfsGetDiskName(int level, int id);
|
|||
const char *tfsPrimaryPath();
|
||||
|
||||
// tfcntl.c ====================================
|
||||
typedef struct TFSFILE TFSFILE;
|
||||
typedef struct TFSFILE {
|
||||
int level;
|
||||
int id;
|
||||
char rname[TSDB_FILENAME_LEN]; // REL name
|
||||
char aname[TSDB_FILENAME_LEN]; // ABS name
|
||||
} TFSFILE;
|
||||
|
||||
const char *tfsAbsName(TFSFILE *pfile);
|
||||
const char *tfsRelName(TFSFILE *pfile);
|
||||
|
@ -44,7 +52,6 @@ void tfsDirName(TFSFILE *pfile, char dest[]);
|
|||
void tfsBaseName(TFSFILE *pfile, char dest[]);
|
||||
int tfsopen(TFSFILE *pfile, int flags);
|
||||
int tfsclose(int fd);
|
||||
TFSFILE * tfsCreateFiles(int level, int nfile, char *fnames[]);
|
||||
int tfsRemoveFiles(int nfile, ...);
|
||||
SDiskID tfsFileID(TFSFILE *pfile);
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@ void tfsDecFileAt(int level, int id);
|
|||
int tfsLock();
|
||||
int tfsUnLock();
|
||||
bool tfsIsLocked();
|
||||
int tfsLevels();
|
||||
|
||||
// tfcntl.c
|
||||
|
||||
|
|
|
@ -18,13 +18,6 @@
|
|||
#include "tfs.h"
|
||||
#include "tfsint.h"
|
||||
|
||||
struct TFSFILE {
|
||||
int level;
|
||||
int id;
|
||||
char rname[TSDB_FILENAME_LEN]; // REL name
|
||||
char aname[TSDB_FILENAME_LEN]; // ABS name
|
||||
};
|
||||
|
||||
struct TFSDIR {
|
||||
int level;
|
||||
int id;
|
||||
|
@ -112,12 +105,39 @@ void tfsBaseName(TFSFILE *pfile, char dest[]) {
|
|||
}
|
||||
|
||||
int tfsopen(TFSFILE *pfile, int flags) {
|
||||
ASSERT(pfile->level != TFS_UNDECIDED_LEVEL);
|
||||
|
||||
if (flags & O_CREAT) {
|
||||
if (access(pfile->aname, F_OK) == 0) {
|
||||
terrno = TSDB_CODE_FS_FILE_ALREADY_EXISTS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// adjust level
|
||||
if (pfile->level > tfsLevels()) {
|
||||
pfile->level = tfsLevels();
|
||||
}
|
||||
|
||||
// adjust id
|
||||
if (pfile->id == TFS_UNDECIDED_ID) {
|
||||
// TODO
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(pfile->id != TFS_UNDECIDED_ID);
|
||||
|
||||
int fd = open(pfile->aname, flags);
|
||||
if (fd < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (flags & O_CREAT) {
|
||||
tfsLock();
|
||||
tfsIncFileAt(pfile->level, pfile->id);
|
||||
tfsUnLock();
|
||||
}
|
||||
|
||||
return fd;
|
||||
}
|
||||
|
||||
|
@ -130,11 +150,6 @@ int tfsclose(int fd) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
TFSFILE *tfsCreateFiles(int level, int nfile, char *fnames[]) {
|
||||
// TODO
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int tfsRemoveFiles(int nfile, ...) {
|
||||
va_list valist;
|
||||
TFSFILE *pfile = NULL;
|
||||
|
|
|
@ -210,6 +210,8 @@ int tfsUnLock() {
|
|||
|
||||
bool tfsIsLocked() { return pfs->locked; }
|
||||
|
||||
int tfsLevels() { return pfs->nlevel; }
|
||||
|
||||
const char *tfsGetDiskName(int level, int id) {
|
||||
return DISK_AT(level, id)->dir;
|
||||
}
|
||||
|
|
|
@ -189,7 +189,7 @@ typedef struct {
|
|||
} STsdbFileInfo;
|
||||
|
||||
typedef struct {
|
||||
TFSFILE* file;
|
||||
TFSFILE file;
|
||||
STsdbFileInfo info;
|
||||
int fd;
|
||||
} SFile;
|
||||
|
|
|
@ -22,11 +22,9 @@
|
|||
#include "tutil.h"
|
||||
#include "tfs.h"
|
||||
|
||||
|
||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
// STsdbFileH ===========================================
|
||||
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
||||
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH));
|
||||
if (pFileH == NULL) {
|
||||
|
@ -126,7 +124,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) { // TODO
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tsdbCloseFileH(STsdbRepo *pRepo) {
|
||||
void tsdbCloseFileH(STsdbRepo *pRepo) { // TODO
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
||||
for (int i = 0; i < pFileH->nFGroups; i++) {
|
||||
|
@ -137,55 +135,106 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid) {
|
||||
// TODO
|
||||
// SFileGroup ===========================================
|
||||
SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) {
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup fGroup = {0};
|
||||
char fnames[TSDB_FILE_TYPE_MAX][TSDB_FILENAME_LEN] = {0};
|
||||
char fname[TSDB_FILENAME_LEN] = "\0";
|
||||
SFileGroup fg = {0};
|
||||
SFileGroup *pfg = &fg;
|
||||
SFile * pfile = NULL;
|
||||
int id = -1;
|
||||
|
||||
ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL);
|
||||
ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL && pFileH->nFGroups < pFileH->maxFGroups);
|
||||
|
||||
// Create files
|
||||
// 1. Create each files
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fnames[type]);
|
||||
}
|
||||
pfile = &(pfg->files[type]);
|
||||
|
||||
int level = tsdbGetFidLevel(); // TODO
|
||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname);
|
||||
|
||||
TFSFILE *pfiles = tfsCreateFiles(level, TSDB_FILE_TYPE_MAX, fnames);
|
||||
if (pfiles == NULL) {
|
||||
// TODO: deal the error
|
||||
}
|
||||
pfile->file = tfsCreateFiles(level, id, fname);
|
||||
if (pfile->file == NULL) {
|
||||
// TODO :deal with error
|
||||
}
|
||||
|
||||
// Write file headers to file
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
int fd = tfsopen(pfiles+type, O_RDONLY);
|
||||
if (fd < 0) {
|
||||
if (tsdbOpenFile(pfile, O_WRONLY) < 0); {
|
||||
// TODO: deal with the ERROR here
|
||||
}
|
||||
|
||||
if (tsdbUpdateFileHeader(pfile) < 0) {
|
||||
// TODO: deal the error
|
||||
}
|
||||
|
||||
tsdbCloseFile(pfile);
|
||||
|
||||
level = TFS_FILE_LEVEL(pfile->file);
|
||||
id = TFS_FILE_ID(pfile->file);
|
||||
}
|
||||
|
||||
// Construct file group
|
||||
fGroup.fileId = fid;
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
if (tsdbCreateFile(&(fGroup.files[type]), pRepo, fid, type, pDisk) < 0) goto _err;
|
||||
}
|
||||
// Set fg
|
||||
pfg->fileId = fid;
|
||||
pfg->state = 0;
|
||||
|
||||
// Register fgroup to the repo
|
||||
// Register fg to the repo
|
||||
pthread_rwlock_wrlock(&pFileH->fhlock);
|
||||
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
||||
pthread_rwlock_unlock(&pFileH->fhlock);
|
||||
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||
|
||||
_err:
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
tsdbDestroyFile(&(fGroup.files[type]));
|
||||
}
|
||||
tdDecDiskFiles(tsDnodeTier, pDisk, true);
|
||||
return NULL;
|
||||
pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||
ASSERT(pfg != NULL);
|
||||
return pfg;
|
||||
}
|
||||
|
||||
void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
||||
ASSERT(pFGroup != NULL);
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
||||
SFileGroup fileGroup = *pFGroup;
|
||||
|
||||
int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
|
||||
if (nFilesLeft > 0) {
|
||||
memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
|
||||
}
|
||||
|
||||
pFileH->nFGroups--;
|
||||
ASSERT(pFileH->nFGroups >= 0);
|
||||
|
||||
tfsRemoveFiles(TSDB_FILE_TYPE_MAX, &fileGroup.files[TSDB_FILE_TYPE_HEAD], &fileGroup.files[TSDB_FILE_TYPE_DATA],
|
||||
&fileGroup.files[TSDB_FILE_TYPE_LAST]);
|
||||
}
|
||||
|
||||
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
|
||||
void *ptr
|
||||
taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
|
||||
if (ptr == NULL) return NULL;
|
||||
return (SFileGroup *)ptr;
|
||||
}
|
||||
|
||||
static int compFGroup(const void *arg1, const void *arg2) {
|
||||
int val1 = ((SFileGroup *)arg1)->fileId;
|
||||
int val2 = ((SFileGroup *)arg2)->fileId;
|
||||
|
||||
if (val1 < val2) {
|
||||
return -1;
|
||||
} else if (val1 > val2) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
|
||||
int fid = *(int *)key;
|
||||
SFileGroup *pFGroup = (SFileGroup *)fgroup;
|
||||
if (fid == pFGroup->fileId) {
|
||||
return 0;
|
||||
} else {
|
||||
return fid > pFGroup->fileId ? 1 : -1;
|
||||
}
|
||||
}
|
||||
|
||||
// SFileGroupIter ===========================================
|
||||
void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) {
|
||||
pIter->pFileH = pFileH;
|
||||
pIter->direction = direction;
|
||||
|
@ -254,6 +303,7 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
|
|||
return pFGroup;
|
||||
}
|
||||
|
||||
// SFile ===========================================
|
||||
int tsdbOpenFile(SFile *pFile, int oflag) {
|
||||
ASSERT(!TSDB_IS_FILE_OPENED(pFile));
|
||||
|
||||
|
@ -277,7 +327,7 @@ void tsdbCloseFile(SFile *pFile) {
|
|||
}
|
||||
}
|
||||
|
||||
int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
|
||||
static int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
|
||||
memset((void *)pFile, 0, sizeof(SFile));
|
||||
pFile->fd = -1;
|
||||
|
||||
|
@ -309,26 +359,6 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
|
||||
void *ptr =
|
||||
taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
|
||||
if (ptr == NULL) return NULL;
|
||||
return (SFileGroup *)ptr;
|
||||
}
|
||||
|
||||
void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = pFileH->pFGroup;
|
||||
|
||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||
|
||||
while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) {
|
||||
tsdbRemoveFileGroup(pRepo, pGroup);
|
||||
}
|
||||
|
||||
pthread_rwlock_unlock(&(pFileH->fhlock));
|
||||
}
|
||||
|
||||
int tsdbUpdateFileHeader(SFile *pFile) {
|
||||
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||
|
||||
|
@ -344,7 +374,8 @@ int tsdbUpdateFileHeader(SFile *pFile) {
|
|||
return -1;
|
||||
}
|
||||
if (taosWrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
||||
tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, TSDB_FILE_NAME(pFile), strerror(errno));
|
||||
tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, TSDB_FILE_NAME(pFile),
|
||||
strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
@ -377,24 +408,6 @@ void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
||||
ASSERT(pFGroup != NULL);
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
||||
SFileGroup fileGroup = *pFGroup;
|
||||
|
||||
int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
|
||||
if (nFilesLeft > 0) {
|
||||
memmove((void *)pFGroup, POINTER_SHIFT(pFGroup, sizeof(SFileGroup)), sizeof(SFileGroup) * nFilesLeft);
|
||||
}
|
||||
|
||||
pFileH->nFGroups--;
|
||||
ASSERT(pFileH->nFGroups >= 0);
|
||||
|
||||
tfsRemoveFiles(TSDB_FILE_TYPE_MAX, &fileGroup.files[TSDB_FILE_TYPE_HEAD], &fileGroup.files[TSDB_FILE_TYPE_DATA],
|
||||
&fileGroup.files[TSDB_FILE_TYPE_LAST]);
|
||||
}
|
||||
|
||||
int tsdbLoadFileHeader(SFile *pFile, uint32_t *version) {
|
||||
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||
|
||||
|
@ -450,6 +463,22 @@ _err:
|
|||
*size = 0;
|
||||
}
|
||||
|
||||
static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); }
|
||||
|
||||
// Retention ===========================================
|
||||
void tsdbRemoveFilesBeyondRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = pFileH->pFGroup;
|
||||
|
||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||
|
||||
while (pFileH->nFGroups > 0 && pGroup[0].fileId < pFidGroup->minFid) {
|
||||
tsdbRemoveFileGroup(pRepo, pGroup);
|
||||
}
|
||||
|
||||
pthread_rwlock_unlock(&(pFileH->fhlock));
|
||||
}
|
||||
|
||||
void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) {
|
||||
TSKEY now = taosGetTimestamp(pCfg->precision);
|
||||
|
||||
|
@ -529,30 +558,4 @@ int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
|||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); }
|
||||
|
||||
static int compFGroup(const void *arg1, const void *arg2) {
|
||||
int val1 = ((SFileGroup *)arg1)->fileId;
|
||||
int val2 = ((SFileGroup *)arg2)->fileId;
|
||||
|
||||
if (val1 < val2) {
|
||||
return -1;
|
||||
} else if (val1 > val2) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
|
||||
int fid = *(int *)key;
|
||||
SFileGroup *pFGroup = (SFileGroup *)fgroup;
|
||||
if (fid == pFGroup->fileId) {
|
||||
return 0;
|
||||
} else {
|
||||
return fid > pFGroup->fileId ? 1 : -1;
|
||||
}
|
||||
}
|
|
@ -105,16 +105,12 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
|||
ASSERT(pHelper != NULL && pGroup != NULL);
|
||||
SFile * pFile = NULL;
|
||||
STsdbRepo *pRepo = pHelper->pRepo;
|
||||
char baseDir[TSDB_FILENAME_LEN] = "\0";
|
||||
char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
|
||||
|
||||
// Clear the helper object
|
||||
tsdbResetHelper(pHelper);
|
||||
|
||||
ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
|
||||
|
||||
tsdbGetBaseDirFromFile(pGroup->files[0].fname, baseDir);
|
||||
tdGetTsdbRootDir(baseDir, REPO_ID(pRepo), tsdbRootDir);
|
||||
// Set the files
|
||||
pHelper->files.fGroup = *pGroup;
|
||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||
|
|
Loading…
Reference in New Issue