refactor
This commit is contained in:
parent
5f6acdbf79
commit
5b75528872
|
@ -259,13 +259,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) {
|
|||
|
||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||
|
||||
tfsremove(&(helperHeadF(pHelper)->file));
|
||||
// tfsremove(&(helperHeadF(pHelper)->file));
|
||||
(void)rename(TSDB_FILE_NAME(helperNewHeadF(pHelper)), TSDB_FILE_NAME(helperHeadF(pHelper)));
|
||||
tfsDecDiskFile(helperNewHeadF(pHelper)->file.level, helperNewHeadF(pHelper)->file.id, 1);
|
||||
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
|
||||
|
||||
if (newLast) {
|
||||
tfsremove(&(helperLastF(pHelper)->file));
|
||||
(void)rename(TSDB_FILE_NAME(helperNewLastF(pHelper)), TSDB_FILE_NAME(helperLastF(pHelper)));
|
||||
tfsDecDiskFile(helperNewLastF(pHelper)->file.level, helperNewLastF(pHelper)->file.id, 1);
|
||||
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
|
||||
} else {
|
||||
pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
|
||||
|
|
|
@ -21,13 +21,16 @@
|
|||
#include "tsdbMain.h"
|
||||
#include "tutil.h"
|
||||
#include "tfs.h"
|
||||
#include "tarray.h"
|
||||
|
||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||
|
||||
static int compFGroup(const void *arg1, const void *arg2);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static void tsdbScanAllFiles(STsdbRepo *pRepo, TFILE **pfArray, int *nfiles);
|
||||
static int tsdbCompareFile(void *arg1, void *arg2);
|
||||
static void *tsdbScanAllFiles(STsdbRepo *pRepo);
|
||||
static int tsdbCompareFile(const void *arg1, const void *arg2);
|
||||
static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile);
|
||||
static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix);
|
||||
|
||||
// STsdbFileH ===========================================
|
||||
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
||||
|
@ -70,24 +73,53 @@ void tsdbFreeFileH(STsdbFileH *pFileH) {
|
|||
int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||
ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
|
||||
|
||||
TFILE *pfArray = NULL;
|
||||
int nfiles = 0;
|
||||
void *pfArray = NULL;
|
||||
|
||||
// Scan the whole directory and get data
|
||||
tsdbScanAllFiles(pRepo, &pfArray, &nfiles);
|
||||
pfArray = tsdbScanAllFiles(pRepo);
|
||||
if (pfArray == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (nfiles == 0) return 0;
|
||||
if (taosArrayGetSize(pfArray) == 0) {
|
||||
taosArrayDestroy(pfArray);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Sort the files
|
||||
qsort((void *)pfArray, nfiles, sizeof(TFILE), tsdbCompareFile);
|
||||
taosArraySort(pfArray, tsdbCompareFile);
|
||||
|
||||
// Loop to recover the files
|
||||
int iter = 0;
|
||||
while (true) {
|
||||
if (iter >= nfiles) break;
|
||||
// TODO
|
||||
if (iter >= taosArrayGetSize(pfArray)) break;
|
||||
|
||||
int vid, fid;
|
||||
char bname[TSDB_FILENAME_LEN] = "\0";
|
||||
char suffix[TSDB_FILENAME_LEN] = "\0";
|
||||
int count = 0;
|
||||
|
||||
TFILE *pf = taosArrayGet(pfArray, iter);
|
||||
tsdbParseFname(pf, bname);
|
||||
tsdbParseFname(bname, &vid, &fid, suffix);
|
||||
count++;
|
||||
iter++;
|
||||
|
||||
while (true) {
|
||||
int nfid = 0;
|
||||
TFILE *npf = taosArrayGet(pfArray, iter);
|
||||
tsdbParseFname(npf, bname);
|
||||
tsdbParseFname(bname, &vid, &nfid, suffix);
|
||||
|
||||
if (nfid != fid) break;
|
||||
count++;
|
||||
iter++;
|
||||
}
|
||||
|
||||
tsdbRestoreFile(pRepo, pf, count);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pfArray);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -481,17 +513,22 @@ int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbScanAllFiles(STsdbRepo *pRepo, TFILE **pfArray, int *nfiles) {
|
||||
static void *tsdbScanAllFiles(STsdbRepo *pRepo) {
|
||||
void * farray = NULL;
|
||||
TDIR * tdir = NULL;
|
||||
char dirName[TSDB_FILENAME_LEN] = "\0";
|
||||
char bname[TSDB_FILENAME_LEN] = "\0";
|
||||
int arraySize = 0;
|
||||
regex_t regex1 = {0};
|
||||
regex_t regex2 = {0};
|
||||
const TFILE *pf = NULL;
|
||||
|
||||
regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat)$", REG_EXTENDED);
|
||||
regcomp(®ex2, "^v[0-9]+f[0-9]+\\.(h|d|l|s)$", REG_EXTENDED);
|
||||
farray = taosArrayInit(256, sizeof(TFILE));
|
||||
if (farray == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat|l|d|h|s)$", REG_EXTENDED);
|
||||
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", REPO_ID(pRepo));
|
||||
|
||||
|
@ -503,41 +540,114 @@ static void tsdbScanAllFiles(STsdbRepo *pRepo, TFILE **pfArray, int *nfiles) {
|
|||
int code = regexec(®ex1, bname, 0, NULL, 0);
|
||||
if (code != 0) {
|
||||
tsdbWarn("vgId:%d file %s exists, ignore it", REPO_ID(pRepo), pf->aname);
|
||||
rename(pf->aname);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nfiles + 1 >= arraySize) {
|
||||
if (arraySize = 0) {
|
||||
arraySize = 1024;
|
||||
} else {
|
||||
arraySize = arraySize * 2;
|
||||
}
|
||||
|
||||
*pfArray = realloc(*pfArray, sizeof(TFILE) * arraySize);
|
||||
}
|
||||
|
||||
(*pfArray)[nfiles++] = *pf;
|
||||
taosArrayPush(farray, pf);
|
||||
}
|
||||
|
||||
regfree(®ex1);
|
||||
tfsClosedir(tdir);
|
||||
|
||||
return farray;
|
||||
}
|
||||
|
||||
static int tsdbCompareFile(void *arg1, void *arg2) {
|
||||
static int tsdbCompareFile(const void *arg1, const void *arg2) {
|
||||
char bname1[TSDB_FILENAME_LEN] = "\0";
|
||||
char bname2[TSDB_FILENAME_LEN] = "\0";
|
||||
TFILE *pf1 = (TFILE *)arg1;
|
||||
TFILE *pf2 = (TFILE *)arg2;
|
||||
int vid1, fid1, vid2, fid2;
|
||||
|
||||
tfsbasename(pf1, bname1);
|
||||
tfsbasename(pf2, bname2);
|
||||
// TODO
|
||||
|
||||
sscanf(bname1, "v%df%d", &vid1, &fid1);
|
||||
sscanf(bname2, "v%df%d", &vid2, &fid2);
|
||||
|
||||
ASSERT(vid1 == vid2);
|
||||
if (fid1 < fid2) {
|
||||
return -1;
|
||||
} else if (fid1 == fid2) {
|
||||
return 0
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
static int tsdbGetTFileFid(TFILE *pf) {
|
||||
static int tsdbRestoreFile(STsdbRepo *pRepo, TFILE *pfiles, int nfile) {
|
||||
char backname[TSDB_FILENAME_LEN] = "\0";
|
||||
char bname[TSDB_FILENAME_LEN] = "\0";
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
TFILE * pfArray[TSDB_FILE_TYPE_MAX] = {0};
|
||||
TFILE * pHf = NULL;
|
||||
TFILE * pLf = NULL;
|
||||
SFileGroup fg = {0};
|
||||
int vid = 0;
|
||||
int fid = 0;
|
||||
char suffix[TSDB_FILENAME_LEN] = "\0";
|
||||
|
||||
for (int i = 0; i < nfile; i++) {
|
||||
TFILE *pf = pfiles + i;
|
||||
|
||||
tfsbasename(pf, bname);
|
||||
tsdbParseFname(bname, &vid, &fid, suffix);
|
||||
|
||||
if (strcmp(suffix, ".head") == 0) {
|
||||
pfArray[TSDB_FILE_TYPE_HEAD] = pf;
|
||||
} else if (strcmp(suffix, ".data") == 0) {
|
||||
pfArray[TSDB_FILE_TYPE_DATA] = pf;
|
||||
} else if (strcmp(suffix, ".last") == 0) {
|
||||
pfArray[TSDB_FILE_TYPE_LAST] = pf;
|
||||
} else if (strcmp(suffix, ".l") == 0) {
|
||||
pLf = pf;
|
||||
} else if (strcmp(suffix, ".h") == 0) {
|
||||
pHf = pf;
|
||||
} else {
|
||||
tsdbWarn("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), pf->aname);
|
||||
}
|
||||
}
|
||||
|
||||
if (pfArray[TSDB_FILE_TYPE_HEAD] == NULL || pfArray[TSDB_FILE_TYPE_DATA] == NULL || pfArray[TSDB_FILE_TYPE_LAST] == NULL) {
|
||||
for (int i = 0; i < nfile; i++) {
|
||||
snprintf(backname, TSDB_FILENAME_LEN, "%s_bak", (pfiles + i)->aname);
|
||||
rename((pfiles + i)->aname, backname);
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pHf == NULL) {
|
||||
if (pLf != NULL) {
|
||||
rename(pLf->aname, pLastf->aname);
|
||||
}
|
||||
} else {
|
||||
if (pLf != NULL) {
|
||||
remove(pLf->aname);
|
||||
}
|
||||
remove(pHf->aname);
|
||||
}
|
||||
|
||||
// Register file
|
||||
fg.fileId = fid;
|
||||
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
SFile *pFile = fg.files + type;
|
||||
|
||||
pFile->fd = -1;
|
||||
pFile->file = *pfArray[type]; // TODO
|
||||
tsdbOpenFile(pFile, O_RDONLY);
|
||||
tsdbLoadFileHeader(pFile);
|
||||
tsdbCloseFile(pFile);
|
||||
}
|
||||
|
||||
pFileH->pFGroup[pFileH->nFGroups++] = fg;
|
||||
|
||||
tfsIncDiskFile(pHeadf->level, pHeadf->id, TSDB_FILE_TYPE_MAX);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbParseFname(const char *bname, int *vid, int *fid, char *suffix) {
|
||||
sscanf(bname, "v%df%d%s", vid, fid, suffix);
|
||||
}
|
Loading…
Reference in New Issue