refactor
This commit is contained in:
parent
64912b6c57
commit
f573875b47
|
@ -220,6 +220,7 @@ int32_t sDebugFlag = 135;
|
||||||
int32_t wDebugFlag = 135;
|
int32_t wDebugFlag = 135;
|
||||||
int32_t tsdbDebugFlag = 131;
|
int32_t tsdbDebugFlag = 131;
|
||||||
int32_t cqDebugFlag = 135;
|
int32_t cqDebugFlag = 135;
|
||||||
|
int32_t fsDebugFlag = 135;
|
||||||
|
|
||||||
int32_t (*monitorStartSystemFp)() = NULL;
|
int32_t (*monitorStartSystemFp)() = NULL;
|
||||||
void (*monitorStopSystemFp)() = NULL;
|
void (*monitorStopSystemFp)() = NULL;
|
||||||
|
|
|
@ -36,6 +36,8 @@ typedef struct {
|
||||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
|
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
|
||||||
void tfsDestroy();
|
void tfsDestroy();
|
||||||
void tfsUpdateInfo();
|
void tfsUpdateInfo();
|
||||||
|
void tfsIncDiskFile(int level, int id, int num);
|
||||||
|
void tfsDecDiskFile(int level, int id, int num);
|
||||||
|
|
||||||
const char *TFS_PRIMARY_PATH();
|
const char *TFS_PRIMARY_PATH();
|
||||||
const char *TFS_DISK_PATH(int level, int id);
|
const char *TFS_DISK_PATH(int level, int id);
|
||||||
|
@ -52,7 +54,10 @@ typedef struct {
|
||||||
#define TFILE_ID(pf) ((pf)->id)
|
#define TFILE_ID(pf) ((pf)->id)
|
||||||
#define TFILE_NAME(pf) ((pf)->aname)
|
#define TFILE_NAME(pf) ((pf)->aname)
|
||||||
|
|
||||||
int tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
||||||
|
int tfsopen(TFILE *pf, int flags);
|
||||||
|
int tfsclose(int fd);
|
||||||
|
int tfsremove(TFILE *pf);
|
||||||
|
|
||||||
// DIR APIs ====================================
|
// DIR APIs ====================================
|
||||||
int tfsMkdir(const char *rname);
|
int tfsMkdir(const char *rname);
|
||||||
|
|
|
@ -54,6 +54,8 @@ typedef struct {
|
||||||
#define TFS_IS_VALID_ID(level, id) (((id) >= 0) && ((id) < TIER_NDISKS(TFS_TIER_AT(level))))
|
#define TFS_IS_VALID_ID(level, id) (((id) >= 0) && ((id) < TIER_NDISKS(TFS_TIER_AT(level))))
|
||||||
#define TFS_IS_VALID_DISK(level, id) (TFS_IS_VALID_LEVEL(level) && TFS_IS_VALID_ID(level, id))
|
#define TFS_IS_VALID_DISK(level, id) (TFS_IS_VALID_LEVEL(level) && TFS_IS_VALID_ID(level, id))
|
||||||
|
|
||||||
|
#define TFS_MIN_DISK_FREE_SIZE 50*1024*1024
|
||||||
|
|
||||||
static SFS tfs = {0};
|
static SFS tfs = {0};
|
||||||
static SFS *pfs = &tfs;
|
static SFS *pfs = &tfs;
|
||||||
|
|
||||||
|
@ -69,6 +71,7 @@ static int tfsUnLock();
|
||||||
static int tfsOpendirImpl(TDIR *tdir);
|
static int tfsOpendirImpl(TDIR *tdir);
|
||||||
static void tfsInitDiskIter(SDiskIter *pIter);
|
static void tfsInitDiskIter(SDiskIter *pIter);
|
||||||
static SDisk *tfsNextDisk(SDiskIter *pIter);
|
static SDisk *tfsNextDisk(SDiskIter *pIter);
|
||||||
|
static int tfsAssignDisk(int level);
|
||||||
|
|
||||||
// FS APIs ====================================
|
// FS APIs ====================================
|
||||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
|
int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
|
||||||
|
@ -136,23 +139,82 @@ void tfsUpdateInfo() {
|
||||||
tfsUnLock();
|
tfsUnLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tfsIncDiskFile(int level, int id, int num) {
|
||||||
|
tfsLock();
|
||||||
|
TFS_DISK_AT(level, id)->dmeta.nfiles += num;
|
||||||
|
tfsUnLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfsDecDiskFile(int level, int id, int num) {
|
||||||
|
tfsLock();
|
||||||
|
TFS_DISK_AT(level, id)->dmeta.nfiles -= num;
|
||||||
|
ASSERT(TFS_DISK_AT(level, id)->dmeta.nfiles >= 0);
|
||||||
|
tfsUnLock();
|
||||||
|
}
|
||||||
|
|
||||||
const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
|
const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
|
||||||
const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
|
const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
|
||||||
|
|
||||||
// TFILE APIs ====================================
|
// TFILE APIs ====================================
|
||||||
int tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
|
void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
|
||||||
if (!TFS_IS_VALID_DISK(level, id)) return -1;
|
|
||||||
|
|
||||||
SDisk *pDisk = TFS_DISK_AT(level, id);
|
SDisk *pDisk = TFS_DISK_AT(level, id);
|
||||||
|
|
||||||
pf->level = level;
|
pf->level = level;
|
||||||
pf->id = id;
|
pf->id = id;
|
||||||
strncpy(pf->rname, bname, TSDB_FILENAME_LEN);
|
strncpy(pf->rname, bname, TSDB_FILENAME_LEN);
|
||||||
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), pf->rname);
|
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), pf->rname);
|
||||||
|
}
|
||||||
|
|
||||||
|
int tfsopen(TFILE *pf, int flags) {
|
||||||
|
int fd = -1;
|
||||||
|
|
||||||
|
if (flags & O_CREAT) {
|
||||||
|
if (pf->level > TFS_NLEVEL()) {
|
||||||
|
pf->level = TFS_NLEVEL();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pf->id == TFS_UNDECIDED_ID) {
|
||||||
|
pf->id = tfsAssignDisk(pf->level);
|
||||||
|
if (pf->id < 0) {
|
||||||
|
fError("failed to assign disk at level %d", pf->level);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsIncDiskFile(pf->level, pf->id, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
fd = open(pf->aname, flags);
|
||||||
|
if (fd < 0) {
|
||||||
|
fError("failed to open file %s since %s", pf->aname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tfsclose(int fd) {
|
||||||
|
int code = close(fd);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tfsremove(TFILE *pf) {
|
||||||
|
int code = remove(pf->aname);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsDecDiskFile(pf->level, pf->id, 1);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// DIR APIs ====================================
|
// DIR APIs ====================================
|
||||||
int tfsMkdir(const char *rname) {
|
int tfsMkdir(const char *rname) {
|
||||||
char aname[TSDB_FILENAME_LEN] = "\0";
|
char aname[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
@ -482,4 +544,32 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
|
||||||
return pDisk;
|
return pDisk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tfsAssignDisk(int level) {
|
||||||
|
if (!TFS_IS_VALID_LEVEL(level)) return -1;
|
||||||
|
|
||||||
|
STier *pTier = TFS_TIER_AT(level);
|
||||||
|
int id = -1;
|
||||||
|
|
||||||
|
tfsLock();
|
||||||
|
|
||||||
|
for (int tid = 0; tid < TIER_NDISKS(pTier); tid++) {
|
||||||
|
SDisk *pDisk = DISK_AT_TIER(pTier, tid);
|
||||||
|
|
||||||
|
if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) continue;
|
||||||
|
|
||||||
|
if (id == -1) {
|
||||||
|
id = tid;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DISK_NFILES(DISK_AT_TIER(pTier, id)) > DISK_NFILES(DISK_AT_TIER(pTier, tid))) {
|
||||||
|
id = tid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsUnLock();
|
||||||
|
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
|
@ -520,6 +520,7 @@ int tsdbOpenFile(SFile* pFile, int oflag);
|
||||||
void tsdbCloseFile(SFile* pFile);
|
void tsdbCloseFile(SFile* pFile);
|
||||||
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
|
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
|
||||||
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
|
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
|
||||||
|
int tsdbGetFidLevel(int fid, SFidGroup fidg);
|
||||||
void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup);
|
void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, SFidGroup* pFidGroup);
|
||||||
int tsdbUpdateFileHeader(SFile* pFile);
|
int tsdbUpdateFileHeader(SFile* pFile);
|
||||||
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
|
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
|
||||||
|
@ -593,7 +594,7 @@ static FORCE_INLINE int compTSKEY(const void* key1, const void* key2) {
|
||||||
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
|
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
|
||||||
|
|
||||||
char* tsdbGetMetaFileName(char* rootDir);
|
char* tsdbGetMetaFileName(char* rootDir);
|
||||||
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, const char* fname);
|
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname);
|
||||||
int tsdbLockRepo(STsdbRepo* pRepo);
|
int tsdbLockRepo(STsdbRepo* pRepo);
|
||||||
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
||||||
char* tsdbGetDataDirName(char* rootDir);
|
char* tsdbGetDataDirName(char* rootDir);
|
||||||
|
|
|
@ -191,7 +191,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) {
|
if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) {
|
||||||
pGroup = tsdbCreateFGroup(pRepo, fid);
|
pGroup = tsdbCreateFGroup(pRepo, fid, tsdbGetFidLevel(fid, pch->fidg));
|
||||||
if (pGroup == NULL) {
|
if (pGroup == NULL) {
|
||||||
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -24,6 +24,9 @@
|
||||||
|
|
||||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||||
|
|
||||||
|
static int compFGroup(const void *arg1, const void *arg2);
|
||||||
|
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||||
|
|
||||||
// STsdbFileH ===========================================
|
// STsdbFileH ===========================================
|
||||||
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
||||||
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH));
|
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(*pFileH));
|
||||||
|
@ -85,51 +88,47 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
|
||||||
// SFileGroup ===========================================
|
// SFileGroup ===========================================
|
||||||
SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) {
|
SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid, int level) {
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
char fname[TSDB_FILENAME_LEN] = "\0";
|
|
||||||
SFileGroup fg = {0};
|
SFileGroup fg = {0};
|
||||||
SFileGroup *pfg = &fg;
|
|
||||||
SFile * pFile = NULL;
|
|
||||||
int id = TFS_UNDECIDED_ID;
|
int id = TFS_UNDECIDED_ID;
|
||||||
|
char fname[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
|
||||||
ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL && pFileH->nFGroups < pFileH->maxFGroups);
|
ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL);
|
||||||
|
ASSERT(pFileH->nFGroups < pFileH->maxFGroups);
|
||||||
|
|
||||||
// 1. Create each files
|
// SET FILE GROUP
|
||||||
|
fg.fileId = fid;
|
||||||
|
|
||||||
|
// CREATE FILES
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
pFile = &(pfg->files[type]);
|
SFile *pFile = &(fg.files[type]);
|
||||||
|
|
||||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->file.rname);
|
pFile->fd = -1;
|
||||||
pFile->file.level = level;
|
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
||||||
pFile->file.id = id;
|
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
|
|
||||||
if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0); {
|
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, fname);
|
||||||
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
tfsInitFile(&pFile->file, level, id, fname);
|
||||||
return NULL;
|
|
||||||
}
|
if (tsdbOpenFile(pFile, O_WRONLY|O_CREAT) < 0) return NULL;
|
||||||
|
|
||||||
if (tsdbUpdateFileHeader(pFile) < 0) {
|
if (tsdbUpdateFileHeader(pFile) < 0) {
|
||||||
tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), TSDB_FILE_NAME(pFile),
|
|
||||||
tstrerror(terrno));
|
|
||||||
tsdbCloseFile(pFile);
|
tsdbCloseFile(pFile);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbCloseFile(pFile);
|
tsdbCloseFile(pFile);
|
||||||
|
|
||||||
level = pFile->file.level;
|
level = TFILE_LEVEL(&(pFile->file));
|
||||||
id = pFile->file.id;
|
id = TFILE_ID(&(pFile->file));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set fg
|
// PUT GROUP INTO FILE HANDLE
|
||||||
pfg->fileId = fid;
|
|
||||||
pfg->state = 0;
|
|
||||||
|
|
||||||
// Register fg to the repo
|
|
||||||
pthread_rwlock_wrlock(&pFileH->fhlock);
|
pthread_rwlock_wrlock(&pFileH->fhlock);
|
||||||
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
pFileH->pFGroup[pFileH->nFGroups++] = fg;
|
||||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
||||||
pthread_rwlock_unlock(&pFileH->fhlock);
|
pthread_rwlock_unlock(&pFileH->fhlock);
|
||||||
|
|
||||||
pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
SFileGroup *pfg = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||||
ASSERT(pfg != NULL);
|
ASSERT(pfg != NULL);
|
||||||
return pfg;
|
return pfg;
|
||||||
}
|
}
|
||||||
|
@ -138,7 +137,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
||||||
ASSERT(pFGroup != NULL);
|
ASSERT(pFGroup != NULL);
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
|
|
||||||
SFileGroup fileGroup = *pFGroup;
|
SFileGroup fg = *pFGroup;
|
||||||
|
|
||||||
int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
|
int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
|
||||||
if (nFilesLeft > 0) {
|
if (nFilesLeft > 0) {
|
||||||
|
@ -149,7 +148,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
||||||
ASSERT(pFileH->nFGroups >= 0);
|
ASSERT(pFileH->nFGroups >= 0);
|
||||||
|
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
SFile *pFile = &(pFGroup->files[type]);
|
SFile *pFile = &(fg.files[type]);
|
||||||
tfsremove(&(pFile->file));
|
tfsremove(&(pFile->file));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -161,6 +160,18 @@ SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
|
||||||
return (SFileGroup *)ptr;
|
return (SFileGroup *)ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbGetFidLevel(int fid, SFidGroup fidg) {
|
||||||
|
if (fid >= fidg.maxFid) {
|
||||||
|
return 0;
|
||||||
|
} else if (fid >= fidg.midFid) {
|
||||||
|
return 1;
|
||||||
|
} else if (fid >= fidg.minFid) {
|
||||||
|
return 2;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int compFGroup(const void *arg1, const void *arg2) {
|
static int compFGroup(const void *arg1, const void *arg2) {
|
||||||
int val1 = ((SFileGroup *)arg1)->fileId;
|
int val1 = ((SFileGroup *)arg1)->fileId;
|
||||||
int val2 = ((SFileGroup *)arg2)->fileId;
|
int val2 = ((SFileGroup *)arg2)->fileId;
|
||||||
|
@ -271,7 +282,7 @@ int tsdbOpenFile(SFile *pFile, int oflag) {
|
||||||
void tsdbCloseFile(SFile *pFile) {
|
void tsdbCloseFile(SFile *pFile) {
|
||||||
if (TSDB_IS_FILE_OPENED(pFile)) {
|
if (TSDB_IS_FILE_OPENED(pFile)) {
|
||||||
tsdbTrace("close file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd);
|
tsdbTrace("close file %s, fd %d", TSDB_FILE_NAME(pFile), pFile->fd);
|
||||||
close(pFile->fd);
|
tfsclose(pFile->fd);
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -406,55 +417,58 @@ void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
// TODO
|
||||||
SFileGroup *pGroup = NULL;
|
|
||||||
SFileGroup nFileGroup = {0};
|
|
||||||
SFileGroup oFileGroup = {0};
|
|
||||||
int level = 0;
|
|
||||||
|
|
||||||
if (tsDnodeTier->nTiers == 1 || (pFidGroup->minFid == pFidGroup->midFid && pFidGroup->midFid == pFidGroup->maxFid)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int gidx = pFileH->nFGroups - 1; gidx >= 0; gidx--) {
|
|
||||||
pGroup = pFileH->pFGroup + gidx;
|
|
||||||
|
|
||||||
level = tsdbGetFidLevel(pGroup->fileId, pFidGroup);
|
|
||||||
|
|
||||||
if (level == pGroup->level) continue;
|
|
||||||
if (level > pGroup->level && level < tsDnodeTier->nTiers) {
|
|
||||||
SDisk *pODisk = tdGetDisk(tsDnodeTier, pGroup->level, pGroup->did);
|
|
||||||
SDisk *pDisk = tdAssignDisk(tsDnodeTier, level);
|
|
||||||
tsdbCreateVnodeDataDir(pDisk->dir, REPO_ID(pRepo));
|
|
||||||
oFileGroup = *pGroup;
|
|
||||||
nFileGroup = *pGroup;
|
|
||||||
nFileGroup.level = level;
|
|
||||||
nFileGroup.did = pDisk->did;
|
|
||||||
|
|
||||||
char tsdbRootDir[TSDB_FILENAME_LEN];
|
|
||||||
tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir);
|
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, type, nFileGroup.files[type].fname);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
if (taosCopy(oFileGroup.files[type].fname, nFileGroup.files[type].fname) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
|
||||||
*pGroup = nFileGroup;
|
|
||||||
pthread_rwlock_unlock(&(pFileH->fhlock));
|
|
||||||
|
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
(void)remove(oFileGroup.files[type].fname);
|
|
||||||
}
|
|
||||||
|
|
||||||
tdLockTiers(tsDnodeTier);
|
|
||||||
tdDecDiskFiles(tsDnodeTier, pODisk, false);
|
|
||||||
tdIncDiskFiles(tsDnodeTier, pDisk, false);
|
|
||||||
tdUnLockTiers(tsDnodeTier);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
// STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
|
// SFileGroup *pGroup = NULL;
|
||||||
|
// SFileGroup nFileGroup = {0};
|
||||||
|
// SFileGroup oFileGroup = {0};
|
||||||
|
// int level = 0;
|
||||||
|
|
||||||
|
// if (tsDnodeTier->nTiers == 1 || (pFidGroup->minFid == pFidGroup->midFid && pFidGroup->midFid == pFidGroup->maxFid)) {
|
||||||
|
// return 0;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// for (int gidx = pFileH->nFGroups - 1; gidx >= 0; gidx--) {
|
||||||
|
// pGroup = pFileH->pFGroup + gidx;
|
||||||
|
|
||||||
|
// level = tsdbGetFidLevel(pGroup->fileId, pFidGroup);
|
||||||
|
|
||||||
|
// if (level == pGroup->level) continue;
|
||||||
|
// if (level > pGroup->level && level < tsDnodeTier->nTiers) {
|
||||||
|
// SDisk *pODisk = tdGetDisk(tsDnodeTier, pGroup->level, pGroup->did);
|
||||||
|
// SDisk *pDisk = tdAssignDisk(tsDnodeTier, level);
|
||||||
|
// tsdbCreateVnodeDataDir(pDisk->dir, REPO_ID(pRepo));
|
||||||
|
// oFileGroup = *pGroup;
|
||||||
|
// nFileGroup = *pGroup;
|
||||||
|
// nFileGroup.level = level;
|
||||||
|
// nFileGroup.did = pDisk->did;
|
||||||
|
|
||||||
|
// char tsdbRootDir[TSDB_FILENAME_LEN];
|
||||||
|
// tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir);
|
||||||
|
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, type, nFileGroup.files[type].fname);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
// if (taosCopy(oFileGroup.files[type].fname, nFileGroup.files[type].fname) < 0) return -1;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
// *pGroup = nFileGroup;
|
||||||
|
// pthread_rwlock_unlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
|
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
// (void)remove(oFileGroup.files[type].fname);
|
||||||
|
// }
|
||||||
|
|
||||||
|
// tdLockTiers(tsDnodeTier);
|
||||||
|
// tdDecDiskFiles(tsDnodeTier, pODisk, false);
|
||||||
|
// tdIncDiskFiles(tsDnodeTier, pDisk, false);
|
||||||
|
// tdUnLockTiers(tsDnodeTier);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// return 0;
|
||||||
}
|
}
|
|
@ -54,8 +54,8 @@ static void tsdbStopStream(STsdbRepo *pRepo);
|
||||||
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
||||||
char tsdbDir[TSDB_FILENAME_LEN] = "\0";
|
char tsdbDir[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
|
||||||
snprintf(tsdbDir, TSDB_FILENAME_LEN, "%s/%s", tfsPrimaryPath(), rootDir);
|
snprintf(tsdbDir, TSDB_FILENAME_LEN, "%s/%s", TFS_PRIMARY_PATH(), rootDir);
|
||||||
DIR *dir = tfs(tsdbDir);
|
DIR *dir = opendir(tsdbDir);
|
||||||
if (dir) {
|
if (dir) {
|
||||||
tsdbDebug("repository %s already exists", rootDir);
|
tsdbDebug("repository %s already exists", rootDir);
|
||||||
closedir(dir);
|
closedir(dir);
|
||||||
|
@ -196,13 +196,15 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
|
||||||
SFileGroup *pFGroup =
|
SFileGroup *pFGroup =
|
||||||
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
|
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
|
||||||
if (pFGroup->fileId == fid) {
|
if (pFGroup->fileId == fid) {
|
||||||
fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname);
|
SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX];
|
||||||
magic = pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].info.magic;
|
fname = strdup(TSDB_FILE_NAME(pFile));
|
||||||
|
magic = pFile->info.magic;
|
||||||
} else {
|
} else {
|
||||||
if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < (int)eindex) {
|
if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < (int)eindex) {
|
||||||
fname = strdup(pFGroup->files[0].fname);
|
SFile *pFile = &pFGroup->files[0];
|
||||||
|
fname = strdup(TSDB_FILE_NAME(pFile));
|
||||||
*index = pFGroup->fileId * TSDB_FILE_TYPE_MAX;
|
*index = pFGroup->fileId * TSDB_FILE_TYPE_MAX;
|
||||||
magic = pFGroup->files[0].info.magic;
|
magic = pFile->info.magic;
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -303,18 +305,18 @@ int tsdbGetState(TSDB_REPO_T *repo) {
|
||||||
|
|
||||||
// ----------------- INTERNAL FUNCTIONS -----------------
|
// ----------------- INTERNAL FUNCTIONS -----------------
|
||||||
char *tsdbGetMetaFileName(char *rootDir) {
|
char *tsdbGetMetaFileName(char *rootDir) {
|
||||||
int tlen = (int)(strlen(tfsPrimaryPath()) + strlen(rootDir) + strlen(TSDB_META_FILE_NAME) + 2);
|
int tlen = (int)(strlen(TFS_PRIMARY_PATH()) + strlen(rootDir) + strlen(TSDB_META_FILE_NAME) + 2);
|
||||||
char *fname = calloc(1, tlen);
|
char *fname = calloc(1, tlen);
|
||||||
if (fname == NULL) {
|
if (fname == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
snprintf(fname, tlen, "%s/%s/%s", tfsPrimaryPath(), rootDir, TSDB_META_FILE_NAME);
|
snprintf(fname, tlen, "%s/%s/%s", TFS_PRIMARY_PATH(), rootDir, TSDB_META_FILE_NAME);
|
||||||
return fname;
|
return fname;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, const char *fname) {
|
void tsdbGetDataFileName(char *rootDir, int vid, int fid, int type, char *fname) {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]);
|
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d%s", rootDir, TSDB_DATA_DIR_NAME, vid, fid, tsdbFileSuffix[type]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,7 +482,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg) {
|
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg) {
|
||||||
if (tfsCreateDir(rootDir) < 0) {
|
if (tfsMkdir(rootDir) < 0) {
|
||||||
tsdbError("vgId:%d failed to create rootDir %s since %s", pCfg->tsdbId, rootDir, tstrerror(terrno));
|
tsdbError("vgId:%d failed to create rootDir %s since %s", pCfg->tsdbId, rootDir, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -493,7 +495,7 @@ static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg) {
|
||||||
char *dirName = tsdbGetDataDirName(rootDir);
|
char *dirName = tsdbGetDataDirName(rootDir);
|
||||||
if (dirName == NULL) return -1;
|
if (dirName == NULL) return -1;
|
||||||
|
|
||||||
if (tfsCreateDir(dirName) < 0) {
|
if (tfsMkdir(dirName) < 0) {
|
||||||
tsdbError("vgId:%d failed to create directory %s since %s", pCfg->tsdbId, dirName, strerror(errno));
|
tsdbError("vgId:%d failed to create directory %s since %s", pCfg->tsdbId, dirName, strerror(errno));
|
||||||
free(dirName);
|
free(dirName);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -514,7 +516,7 @@ static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbUnsetRepoEnv(char *rootDir) {
|
static int32_t tsdbUnsetRepoEnv(char *rootDir) {
|
||||||
tfsRemoveDir(rootDir);
|
tfsRmdir(rootDir);
|
||||||
tsdbDebug("repository %s is removed", rootDir);
|
tsdbDebug("repository %s is removed", rootDir);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -610,14 +612,14 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *tsdbGetCfgFname(char *rootDir) {
|
static char *tsdbGetCfgFname(char *rootDir) {
|
||||||
int tlen = (int)(strlen(tfsPrimaryPath()) + strlen(rootDir) + strlen(TSDB_CFG_FILE_NAME) + 3);
|
int tlen = (int)(strlen(TFS_PRIMARY_PATH()) + strlen(rootDir) + strlen(TSDB_CFG_FILE_NAME) + 3);
|
||||||
char *fname = calloc(1, tlen);
|
char *fname = calloc(1, tlen);
|
||||||
if (fname == NULL) {
|
if (fname == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
snprintf(fname, tlen, "%s/%s/%s", tfsPrimaryPath(), rootDir, TSDB_CFG_FILE_NAME);
|
snprintf(fname, tlen, "%s/%s/%s", TFS_PRIMARY_PATH(), rootDir, TSDB_CFG_FILE_NAME);
|
||||||
return fname;
|
return fname;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -105,6 +105,9 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
ASSERT(pHelper != NULL && pGroup != NULL);
|
ASSERT(pHelper != NULL && pGroup != NULL);
|
||||||
SFile * pFile = NULL;
|
SFile * pFile = NULL;
|
||||||
STsdbRepo *pRepo = pHelper->pRepo;
|
STsdbRepo *pRepo = pHelper->pRepo;
|
||||||
|
char fname[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
int level = pGroup->files[0].file.level;
|
||||||
|
int id = pGroup->files[0].file.id;
|
||||||
|
|
||||||
// Clear the helper object
|
// Clear the helper object
|
||||||
tsdbResetHelper(pHelper);
|
tsdbResetHelper(pHelper);
|
||||||
|
@ -113,17 +116,16 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
|
|
||||||
// Set the files
|
// Set the files
|
||||||
pHelper->files.fGroup = *pGroup;
|
pHelper->files.fGroup = *pGroup;
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
// if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
|
// tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, fname);
|
||||||
helperNewHeadF(pHelper)->file.rname);
|
// helperNewHeadF(pHelper)->file.level = pGroup->files[0].file.level;
|
||||||
helperNewHeadF(pHelper)->file.level = pGroup->files[0].file.level;
|
// helperNewHeadF(pHelper)->file.id = pGroup->files[0].file.id;
|
||||||
helperNewHeadF(pHelper)->file.id = pGroup->files[0].file.id;
|
|
||||||
|
|
||||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
|
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
|
||||||
helperNewLastF(pHelper)->file.rname);
|
// helperNewLastF(pHelper)->file.rname);
|
||||||
helperNewLastF(pHelper)->file.level = pGroup->files[0].file.level;
|
// helperNewLastF(pHelper)->file.level = pGroup->files[0].file.level;
|
||||||
helperNewLastF(pHelper)->file.id = pGroup->files[0].file.id;
|
// helperNewLastF(pHelper)->file.id = pGroup->files[0].file.id;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Open the files
|
// Open the files
|
||||||
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1;
|
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1;
|
||||||
|
@ -133,18 +135,25 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
|
|
||||||
// Create and open .h
|
// Create and open .h
|
||||||
pFile = helperNewHeadF(pHelper);
|
pFile = helperNewHeadF(pHelper);
|
||||||
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
|
pFile->fd = -1;
|
||||||
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
||||||
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
|
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, fname);
|
||||||
|
tfsInitFile(&(pFile->file), level, id, fname);
|
||||||
|
// TODO: not allow it the increase 1
|
||||||
|
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
|
||||||
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
|
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
|
||||||
|
|
||||||
// Create and open .l file if should
|
// Create and open .l file if should
|
||||||
if (tsdbShouldCreateNewLast(pHelper)) {
|
if (tsdbShouldCreateNewLast(pHelper)) {
|
||||||
pFile = helperNewLastF(pHelper);
|
pFile = helperNewLastF(pHelper);
|
||||||
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
|
pFile->fd = -1;
|
||||||
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
pFile->info.size = TSDB_FILE_HEAD_SIZE;
|
||||||
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||||
pFile->info.len = 0;
|
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, fname);
|
||||||
|
tfsInitFile(&(pFile->file), level, id, fname);
|
||||||
|
// TODO: not allow it the increase 1
|
||||||
|
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
|
||||||
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
|
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue