refactor
This commit is contained in:
parent
c2aa460350
commit
1e914e724b
|
@ -397,6 +397,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, 0, 0x2203, "tfs duplic
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_PRIMARY_DISK, 0, 0x2204, "tfs no primary mount")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_PRIMARY_DISK, 0, 0x2204, "tfs no primary mount")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_MOUNT_AT_TIER, 0, 0x2205, "tfs no mount at tier")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_MOUNT_AT_TIER, 0, 0x2205, "tfs no mount at tier")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, 0, 0x2206, "tfs file already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, 0, 0x2206, "tfs file already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, 0, 0x2207, "tfs invalid level")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, 0, 0x2208, "tfs no valid disk")
|
||||||
|
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
|
|
|
@ -40,6 +40,7 @@ int64_t tfsTotalSize();
|
||||||
int64_t tfsAvailSize();
|
int64_t tfsAvailSize();
|
||||||
void tfsIncDiskFile(int level, int id, int num);
|
void tfsIncDiskFile(int level, int id, int num);
|
||||||
void tfsDecDiskFile(int level, int id, int num);
|
void tfsDecDiskFile(int level, int id, int num);
|
||||||
|
|
||||||
const char *TFS_PRIMARY_PATH();
|
const char *TFS_PRIMARY_PATH();
|
||||||
const char *TFS_DISK_PATH(int level, int id);
|
const char *TFS_DISK_PATH(int level, int id);
|
||||||
|
|
||||||
|
@ -56,9 +57,12 @@ typedef struct {
|
||||||
#define TFILE_NAME(pf) ((pf)->aname)
|
#define TFILE_NAME(pf) ((pf)->aname)
|
||||||
|
|
||||||
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
||||||
|
void tfsSetLevel(TFILE *pf, int level);
|
||||||
|
void tfsSetID(TFILE *pf, int id);
|
||||||
int tfsopen(TFILE *pf, int flags);
|
int tfsopen(TFILE *pf, int flags);
|
||||||
int tfsclose(int fd);
|
int tfsclose(int fd);
|
||||||
int tfsremove(TFILE *pf);
|
int tfsremove(TFILE *pf);
|
||||||
|
int tfscopy(TFILE *sf, TFILE *df);
|
||||||
|
|
||||||
// DIR APIs ====================================
|
// DIR APIs ====================================
|
||||||
int tfsMkdir(const char *rname);
|
int tfsMkdir(const char *rname);
|
||||||
|
|
|
@ -160,29 +160,49 @@ const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
|
||||||
const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
|
const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
|
||||||
|
|
||||||
// TFILE APIs ====================================
|
// TFILE APIs ====================================
|
||||||
void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
|
static void tfsSetFileAname(TFILE *pf) {
|
||||||
SDisk *pDisk = TFS_DISK_AT(level, id);
|
if (TFS_IS_VALID_DISK(pf->level, pf->id)) {
|
||||||
|
SDisk *pDisk = TFS_DISK_AT(pf->level, pf->level);
|
||||||
|
ASSERT(pDisk != NULL);
|
||||||
|
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), pf->rname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
|
||||||
pf->level = level;
|
pf->level = level;
|
||||||
pf->id = id;
|
pf->id = id;
|
||||||
strncpy(pf->rname, bname, TSDB_FILENAME_LEN);
|
strncpy(pf->rname, bname, TSDB_FILENAME_LEN);
|
||||||
snprintf(pf->aname, TSDB_FILENAME_LEN, "%s/%s", DISK_DIR(pDisk), pf->rname);
|
tfsSetFileAname(pf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfsSetLevel(TFILE *pf, int level) {
|
||||||
|
pf->level = level;
|
||||||
|
|
||||||
|
tfsSetFileAname(pf);
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfsSetID(TFILE *pf, int id) {
|
||||||
|
pf->id = id;
|
||||||
|
|
||||||
|
tfsSetFileAname(pf);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfsopen(TFILE *pf, int flags) {
|
int tfsopen(TFILE *pf, int flags) {
|
||||||
int fd = -1;
|
int fd = -1;
|
||||||
|
|
||||||
if (flags & O_CREAT) {
|
if (flags & O_CREAT) {
|
||||||
if (pf->level > TFS_NLEVEL()) {
|
if (pf->level >= TFS_NLEVEL()) {
|
||||||
pf->level = TFS_NLEVEL();
|
tfsSetLevel(pf, TFS_NLEVEL() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pf->id == TFS_UNDECIDED_ID) {
|
if (pf->id == TFS_UNDECIDED_ID) {
|
||||||
pf->id = tfsAssignDisk(pf->level);
|
int id = tfsAssignDisk(pf->level);
|
||||||
if (pf->id < 0) {
|
if (id < 0) {
|
||||||
fError("failed to assign disk at level %d", pf->level);
|
fError("failed to assign disk at level %d", pf->level);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tfsSetID(pf, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfsIncDiskFile(pf->level, pf->id, 1);
|
tfsIncDiskFile(pf->level, pf->id, 1);
|
||||||
|
@ -219,6 +239,32 @@ int tfsremove(TFILE *pf) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tfscopy(TFILE *sf, TFILE *df) {
|
||||||
|
if (df->level >= TFS_NLEVEL()) {
|
||||||
|
tfsSetLevel(df, TFS_NLEVEL() - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sf->level == df->level) {
|
||||||
|
terrno = TSDB_CODE_FS_INVLD_LEVEL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (df->id == TFS_UNDECIDED_ID) {
|
||||||
|
int id = tfsAssignDisk(df->level);
|
||||||
|
if (id < 0) {
|
||||||
|
terrno = TSDB_CODE_FS_NO_VALID_DISK;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tfsSetID(df, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsIncDiskFile(df->level, df->id, 1);
|
||||||
|
|
||||||
|
taosCopy(sf->aname, df->aname);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// DIR APIs ====================================
|
// DIR APIs ====================================
|
||||||
int tfsMkdir(const char *rname) {
|
int tfsMkdir(const char *rname) {
|
||||||
char aname[TSDB_FILENAME_LEN] = "\0";
|
char aname[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
|
|
@ -259,10 +259,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
|
tfsremove(&(helperHeadF(pHelper)->file));
|
||||||
(void)rename(TSDB_FILE_NAME(helperNewHeadF(pHelper)), TSDB_FILE_NAME(helperHeadF(pHelper)));
|
(void)rename(TSDB_FILE_NAME(helperNewHeadF(pHelper)), TSDB_FILE_NAME(helperHeadF(pHelper)));
|
||||||
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
|
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
|
||||||
|
|
||||||
if (newLast) {
|
if (newLast) {
|
||||||
|
tfsremove(&(helperLastF(pHelper)->file));
|
||||||
(void)rename(TSDB_FILE_NAME(helperNewLastF(pHelper)), TSDB_FILE_NAME(helperLastF(pHelper)));
|
(void)rename(TSDB_FILE_NAME(helperNewLastF(pHelper)), TSDB_FILE_NAME(helperLastF(pHelper)));
|
||||||
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
|
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -417,58 +417,48 @@ void tsdbGetFidGroup(STsdbCfg *pCfg, SFidGroup *pFidGroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
||||||
// TODO
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
|
|
||||||
|
for (int i = 0; i < pFileH->nFGroups; i++) {
|
||||||
|
SFileGroup ofg = pFileH->pFGroup[i];
|
||||||
|
|
||||||
|
int level = tsdbGetFidLevel(ofg.fileId, *pFidGroup);
|
||||||
|
ASSERT(level >= 0);
|
||||||
|
|
||||||
|
if (level == ofg.files[0].file.level) continue;
|
||||||
|
|
||||||
|
// COPY THE FILE GROUP TO THE RIGHT LEVEL
|
||||||
|
SFileGroup nfg = ofg;
|
||||||
|
int id = TFS_UNDECIDED_ID;
|
||||||
|
int type = 0;
|
||||||
|
for (; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
tfsInitFile(&nfg.files[type].file, level, id, nfg.files[type].file.rname);
|
||||||
|
if (tfscopy(&(ofg.files[type].file), &(nfg.files[type].file)) < 0) {
|
||||||
|
if (terrno == TSDB_CODE_FS_INVLD_LEVEL) break;
|
||||||
|
tsdbError("vgId:%d failed to move fid %d from level %d to level %d since %s", REPO_ID(pRepo), ofg.fileId,
|
||||||
|
ofg.files[0].file.level, level, strerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
id = nfg.files[type].file.level;
|
||||||
|
id = nfg.files[type].file.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type < TSDB_FILE_TYPE_MAX) continue;
|
||||||
|
|
||||||
|
// Register new file into TSDB
|
||||||
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
pFileH->pFGroup[i] = nfg;
|
||||||
|
pthread_rwlock_unlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
|
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
SFile *pFile = &(ofg.files[type]);
|
||||||
|
tfsremove(&(pFile->file));
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbDebug("vgId:%d move file group %d from level %d to level %d", REPO_ID(pRepo), ofg.fileId,
|
||||||
|
ofg.files[0].file.level, level);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// STsdbFileH *pFileH = pRepo->tsdbFileH;
|
|
||||||
// SFileGroup *pGroup = NULL;
|
|
||||||
// SFileGroup nFileGroup = {0};
|
|
||||||
// SFileGroup oFileGroup = {0};
|
|
||||||
// int level = 0;
|
|
||||||
|
|
||||||
// if (tsDnodeTier->nTiers == 1 || (pFidGroup->minFid == pFidGroup->midFid && pFidGroup->midFid == pFidGroup->maxFid)) {
|
|
||||||
// return 0;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// for (int gidx = pFileH->nFGroups - 1; gidx >= 0; gidx--) {
|
|
||||||
// pGroup = pFileH->pFGroup + gidx;
|
|
||||||
|
|
||||||
// level = tsdbGetFidLevel(pGroup->fileId, pFidGroup);
|
|
||||||
|
|
||||||
// if (level == pGroup->level) continue;
|
|
||||||
// if (level > pGroup->level && level < tsDnodeTier->nTiers) {
|
|
||||||
// SDisk *pODisk = tdGetDisk(tsDnodeTier, pGroup->level, pGroup->did);
|
|
||||||
// SDisk *pDisk = tdAssignDisk(tsDnodeTier, level);
|
|
||||||
// tsdbCreateVnodeDataDir(pDisk->dir, REPO_ID(pRepo));
|
|
||||||
// oFileGroup = *pGroup;
|
|
||||||
// nFileGroup = *pGroup;
|
|
||||||
// nFileGroup.level = level;
|
|
||||||
// nFileGroup.did = pDisk->did;
|
|
||||||
|
|
||||||
// char tsdbRootDir[TSDB_FILENAME_LEN];
|
|
||||||
// tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir);
|
|
||||||
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, type, nFileGroup.files[type].fname);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
// if (taosCopy(oFileGroup.files[type].fname, nFileGroup.files[type].fname) < 0) return -1;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// pthread_rwlock_wrlock(&(pFileH->fhlock));
|
|
||||||
// *pGroup = nFileGroup;
|
|
||||||
// pthread_rwlock_unlock(&(pFileH->fhlock));
|
|
||||||
|
|
||||||
// for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
// (void)remove(oFileGroup.files[type].fname);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// tdLockTiers(tsDnodeTier);
|
|
||||||
// tdDecDiskFiles(tsDnodeTier, pODisk, false);
|
|
||||||
// tdIncDiskFiles(tsDnodeTier, pDisk, false);
|
|
||||||
// tdUnLockTiers(tsDnodeTier);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return 0;
|
|
||||||
}
|
}
|
|
@ -116,16 +116,6 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
|
|
||||||
// Set the files
|
// Set the files
|
||||||
pHelper->files.fGroup = *pGroup;
|
pHelper->files.fGroup = *pGroup;
|
||||||
// if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
|
||||||
// tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD, fname);
|
|
||||||
// helperNewHeadF(pHelper)->file.level = pGroup->files[0].file.level;
|
|
||||||
// helperNewHeadF(pHelper)->file.id = pGroup->files[0].file.id;
|
|
||||||
|
|
||||||
// tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
|
|
||||||
// helperNewLastF(pHelper)->file.rname);
|
|
||||||
// helperNewLastF(pHelper)->file.level = pGroup->files[0].file.level;
|
|
||||||
// helperNewLastF(pHelper)->file.id = pGroup->files[0].file.id;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Open the files
|
// Open the files
|
||||||
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1;
|
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1;
|
||||||
|
|
Loading…
Reference in New Issue