commit
fe6f96b1ce
|
@ -186,6 +186,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int fileId;
|
||||
int state; // 0 for health, 1 for problem
|
||||
SFile files[TSDB_FILE_TYPE_MAX];
|
||||
} SFileGroup;
|
||||
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#define _DEFAULT_SOURCE
|
||||
#include <regex.h>
|
||||
|
||||
#include "os.h"
|
||||
#include "talgo.h"
|
||||
#include "tchecksum.h"
|
||||
|
@ -23,10 +25,11 @@
|
|||
|
||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||
|
||||
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
||||
static void tsdbDestroyFile(SFile *pFile);
|
||||
static int compFGroup(const void *arg1, const void *arg2);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
||||
static void tsdbDestroyFile(SFile *pFile);
|
||||
static int compFGroup(const void *arg1, const void *arg2);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
||||
|
@ -69,12 +72,14 @@ void tsdbFreeFileH(STsdbFileH *pFileH) {
|
|||
int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||
ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
|
||||
|
||||
char *tDataDir = NULL;
|
||||
DIR * dir = NULL;
|
||||
int fid = 0;
|
||||
int vid = 0;
|
||||
char * tDataDir = NULL;
|
||||
DIR * dir = NULL;
|
||||
int fid = 0;
|
||||
int vid = 0;
|
||||
regex_t regex1, regex2;
|
||||
int code = 0;
|
||||
|
||||
SFileGroup fileGroup = {0};
|
||||
SFileGroup fileGroup = {0};
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
||||
tDataDir = tsdbGetDataDirName(pRepo->rootDir);
|
||||
|
@ -90,28 +95,56 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
code = regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat)$", REG_EXTENDED);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = regcomp(®ex2, "^v[0-9]+f[0-9]+\\.(h|d|l|s)$", REG_EXTENDED);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
struct dirent *dp = NULL;
|
||||
while ((dp = readdir(dir)) != NULL) {
|
||||
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
|
||||
sscanf(dp->d_name, "v%df%d", &vid, &fid);
|
||||
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
|
||||
|
||||
if (tsdbSearchFGroup(pRepo->tsdbFileH, fid, TD_EQ) != NULL) continue;
|
||||
code = regexec(®ex1, dp->d_name, 0, NULL, 0);
|
||||
if (code == 0) {
|
||||
sscanf(dp->d_name, "v%df%d", &vid, &fid);
|
||||
if (vid != REPO_ID(pRepo)) {
|
||||
tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
|
||||
fileGroup.fileId = fid;
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) {
|
||||
tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type);
|
||||
if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue;
|
||||
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
|
||||
fileGroup.fileId = fid;
|
||||
|
||||
tsdbInitFileGroup(&fileGroup, pRepo);
|
||||
} else if (code == REG_NOMATCH) {
|
||||
code = regexec(®ex2, dp->d_name, 0, NULL, 0);
|
||||
if (code == 0) {
|
||||
tsdbDebug("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), dp->d_name);
|
||||
remove(dp->d_name);
|
||||
} else if (code == REG_NOMATCH) {
|
||||
tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
|
||||
continue;
|
||||
} else {
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tsdbDebug("vgId:%d file group %d init", REPO_ID(pRepo), fid);
|
||||
|
||||
pFileH->pFGroup[pFileH->nFGroups++] = fileGroup;
|
||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
||||
}
|
||||
|
||||
regfree(®ex1);
|
||||
regfree(®ex2);
|
||||
taosTFree(tDataDir);
|
||||
closedir(dir);
|
||||
return 0;
|
||||
|
@ -119,6 +152,9 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
|
|||
_err:
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]);
|
||||
|
||||
regfree(®ex1);
|
||||
regfree(®ex2);
|
||||
|
||||
taosTFree(tDataDir);
|
||||
if (dir != NULL) closedir(dir);
|
||||
tsdbCloseFileH(pRepo);
|
||||
|
@ -450,3 +486,14 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
|
|||
return fid > pFGroup->fileId ? 1 : -1;
|
||||
}
|
||||
}
|
||||
|
||||
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) {
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
if (tsdbInitFile(&pFGroup->files[type], pRepo, pFGroup->fileId, type) < 0) {
|
||||
memset(&pFGroup->files[type].info, 0, sizeof(STsdbFileInfo));
|
||||
pFGroup->files[type].info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
pFGroup->state = 1;
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -71,6 +71,8 @@ static void tsdbStopStream(STsdbRepo *pRepo);
|
|||
|
||||
// Function declaration
|
||||
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
||||
taosRemoveDir(rootDir);
|
||||
|
||||
if (mkdir(rootDir, 0755) < 0) {
|
||||
tsdbError("vgId:%d failed to create rootDir %s since %s", pCfg->tsdbId, rootDir, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -95,6 +97,8 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
|
|||
STsdbCfg config = {0};
|
||||
STsdbRepo *pRepo = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
if (tsdbLoadConfig(rootDir, &config) < 0) {
|
||||
tsdbError("failed to open repo in rootDir %s since %s", rootDir, tstrerror(terrno));
|
||||
return NULL;
|
||||
|
@ -799,6 +803,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
|
||||
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
|
||||
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
|
||||
if (pFGroup->state) continue;
|
||||
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
|
||||
if (tsdbLoadCompIdx(&rhelper, NULL) < 0) goto _err;
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
|
|
|
@ -257,7 +257,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
if (pVnode->tsdb == NULL) {
|
||||
vnodeCleanUp(pVnode);
|
||||
return terrno;
|
||||
} else if (terrno != 0 && pVnode->syncCfg.replica <= 1) {
|
||||
} else if (terrno != TSDB_CODE_SUCCESS && pVnode->syncCfg.replica <= 1) {
|
||||
vError("vgId:%d, failed to open tsdb, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
|
||||
tstrerror(terrno));
|
||||
vnodeCleanUp(pVnode);
|
||||
|
|
Loading…
Reference in New Issue