more code

This commit is contained in:
Hongze Cheng 2023-05-08 13:20:15 +08:00
parent bc98d60138
commit 198fe28967
2 changed files with 65 additions and 96 deletions

View File

@ -45,8 +45,8 @@ struct STFileSystem {
int32_t state; int32_t state;
tsem_t canEdit; tsem_t canEdit;
int64_t nextEditId; int64_t nextEditId;
SArray *cstate; // current state SArray *cstate; // current state, SArray<STFileSet>
SArray *nstate; // next state SArray *nstate; // next state, SArray<STFileSet>
}; };
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -25,7 +25,19 @@ enum {
TSDB_FS_STATE_CLOSE, TSDB_FS_STATE_CLOSE,
}; };
static int32_t create_file_system(STsdb *pTsdb, STFileSystem **ppFS) { typedef enum {
TSDB_FCURRENT = 1,
TSDB_FCURRENT_C,
TSDB_FCURRENT_M,
} EFCurrentT;
static const char *gCurrentFname[] = {
[TSDB_FCURRENT] = "current.json",
[TSDB_FCURRENT_C] = "current.json.0",
[TSDB_FCURRENT_M] = "current.json.1",
};
static int32_t create_fs(STsdb *pTsdb, STFileSystem **ppFS) {
ppFS[0] = taosMemoryCalloc(1, sizeof(*ppFS[0])); ppFS[0] = taosMemoryCalloc(1, sizeof(*ppFS[0]));
if (ppFS[0] == NULL) { if (ppFS[0] == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -48,18 +60,17 @@ static int32_t create_file_system(STsdb *pTsdb, STFileSystem **ppFS) {
return 0; return 0;
} }
static int32_t destroy_file_system(STFileSystem **ppFS) { static int32_t destroy_fs(STFileSystem **ppFS) {
if (ppFS[0]) { if (ppFS[0] == NULL) return 0;
taosArrayDestroy(ppFS[0]->nstate); taosArrayDestroy(ppFS[0]->nstate);
taosArrayDestroy(ppFS[0]->cstate); taosArrayDestroy(ppFS[0]->cstate);
tsem_destroy(&ppFS[0]->canEdit); tsem_destroy(&ppFS[0]->canEdit);
taosMemoryFree(ppFS[0]); taosMemoryFree(ppFS[0]);
ppFS[0] = NULL; ppFS[0] = NULL;
}
return 0; return 0;
} }
static int32_t get_current_json(STsdb *pTsdb, char fname[]) { static int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
if (pTsdb->pVnode->pTfs) { if (pTsdb->pVnode->pTfs) {
snprintf(fname, // snprintf(fname, //
TSDB_FILENAME_LEN, // TSDB_FILENAME_LEN, //
@ -68,63 +79,18 @@ static int32_t get_current_json(STsdb *pTsdb, char fname[]) {
TD_DIRSEP, // TD_DIRSEP, //
pTsdb->path, // pTsdb->path, //
TD_DIRSEP, // TD_DIRSEP, //
"current.json"); gCurrentFname[ftype]);
} else { } else {
snprintf(fname, // snprintf(fname, //
TSDB_FILENAME_LEN, // TSDB_FILENAME_LEN, //
"%s%s%s", // "%s%s%s", //
pTsdb->path, // pTsdb->path, //
TD_DIRSEP, // TD_DIRSEP, //
"current.json"); gCurrentFname[ftype]);
} }
return 0; return 0;
} }
static int32_t get_current_temp(STsdb *pTsdb, char fname[], tsdb_fs_edit_t etype) {
switch (etype) {
case TSDB_FS_EDIT_COMMIT:
if (pTsdb->pVnode->pTfs) {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s%s%s", //
tfsGetPrimaryPath(pTsdb->pVnode->pTfs), //
TD_DIRSEP, //
pTsdb->path, //
TD_DIRSEP, //
"current.json.commit");
} else {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s", //
pTsdb->path, //
TD_DIRSEP, //
"current.json.commit");
}
break;
default:
if (pTsdb->pVnode->pTfs) {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s%s%s", //
tfsGetPrimaryPath(pTsdb->pVnode->pTfs), //
TD_DIRSEP, //
pTsdb->path, //
TD_DIRSEP, //
"current.json.t");
} else {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s", //
pTsdb->path, //
TD_DIRSEP, //
"current.json.t");
}
break;
}
return 0;
}
static int32_t fs_to_json_str(STFileSystem *pFS, char **ppData) { static int32_t fs_to_json_str(STFileSystem *pFS, char **ppData) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -259,8 +225,8 @@ static int32_t commit_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
char ofname[TSDB_FILENAME_LEN]; char ofname[TSDB_FILENAME_LEN];
char nfname[TSDB_FILENAME_LEN]; char nfname[TSDB_FILENAME_LEN];
get_current_json(pFS->pTsdb, nfname); current_fname(pFS->pTsdb, nfname, TSDB_FCURRENT);
get_current_temp(pFS->pTsdb, ofname, etype); current_fname(pFS->pTsdb, ofname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
code = taosRenameFile(ofname, nfname); code = taosRenameFile(ofname, nfname);
if (code) { if (code) {
@ -277,7 +243,7 @@ static int32_t abort_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
int32_t code; int32_t code;
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
get_current_temp(pFS->pTsdb, fname, etype); current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
code = taosRemoveFile(fname); code = taosRemoveFile(fname);
if (code) code = TAOS_SYSTEM_ERROR(code); if (code) code = TAOS_SYSTEM_ERROR(code);
@ -295,47 +261,50 @@ static int32_t scan_and_schedule_merge(STFileSystem *pFS) {
return 0; return 0;
} }
static int32_t open_file_system(STFileSystem *pFS, int8_t rollback) { static int32_t update_fs_if_needed(STFileSystem *pFS) {
// TODO
return 0;
}
static int32_t open_fs(STFileSystem *pFS, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STsdb *pTsdb = pFS->pTsdb; STsdb *pTsdb = pFS->pTsdb;
bool update = false; // TODO code = update_fs_if_needed(pFS);
if (update) { TSDB_CHECK_CODE(code, lino, _exit)
// TODO
} else {
char current_json[TSDB_FILENAME_LEN];
char current_json_commit[TSDB_FILENAME_LEN];
char current_json_t[TSDB_FILENAME_LEN];
get_current_json(pTsdb, current_json); char fCurrent[TSDB_FILENAME_LEN];
get_current_temp(pTsdb, current_json_commit, TSDB_FS_EDIT_COMMIT); char cCurrent[TSDB_FILENAME_LEN];
get_current_temp(pTsdb, current_json_t, TSDB_FS_EDIT_MERGE); char mCurrent[TSDB_FILENAME_LEN];
if (taosCheckExistFile(current_json)) { // current.json exists current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
code = load_fs_from_file(current_json, pFS); current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
TSDB_CHECK_CODE(code, lino, _exit); current_fname(pTsdb, mCurrent, TSDB_FCURRENT_C);
// check current.json.commit existence if (taosCheckExistFile(fCurrent)) { // current.json exists
if (taosCheckExistFile(current_json_commit)) { code = load_fs_from_file(fCurrent, pFS);
if (rollback) { TSDB_CHECK_CODE(code, lino, _exit);
code = commit_edit(pFS, TSDB_FS_EDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = abort_edit(pFS, TSDB_FS_EDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// check current.json.t existence // check current.json.commit existence
if (taosCheckExistFile(current_json_t)) { if (taosCheckExistFile(cCurrent)) {
code = abort_edit(pFS, TSDB_FS_EDIT_MERGE); if (rollback) {
code = commit_edit(pFS, TSDB_FS_EDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = abort_edit(pFS, TSDB_FS_EDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} else { }
code = save_fs_to_file(pFS, current_json);
// check current.json.t existence
if (taosCheckExistFile(mCurrent)) {
code = abort_edit(pFS, TSDB_FS_EDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} else {
code = save_fs_to_file(pFS, fCurrent);
TSDB_CHECK_CODE(code, lino, _exit);
} }
code = scan_file_system(pFS); code = scan_file_system(pFS);
@ -430,16 +399,16 @@ int32_t tsdbOpenFileSystem(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback) {
int32_t code; int32_t code;
int32_t lino; int32_t lino;
code = create_file_system(pTsdb, ppFS); code = create_fs(pTsdb, ppFS);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = open_file_system(ppFS[0], rollback); code = open_fs(ppFS[0], rollback);
TSDB_CHECK_CODE(code, lino, _exit) TSDB_CHECK_CODE(code, lino, _exit)
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
destroy_file_system(ppFS); destroy_fs(ppFS);
} else { } else {
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__); tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
} }
@ -449,7 +418,7 @@ _exit:
int32_t tsdbCloseFileSystem(STFileSystem **ppFS) { int32_t tsdbCloseFileSystem(STFileSystem **ppFS) {
if (ppFS[0] == NULL) return 0; if (ppFS[0] == NULL) return 0;
close_file_system(ppFS[0]); close_file_system(ppFS[0]);
destroy_file_system(ppFS); destroy_fs(ppFS);
return 0; return 0;
} }
@ -458,7 +427,7 @@ int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_f
int32_t lino; int32_t lino;
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
get_current_temp(pFS->pTsdb, fname, etype); current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
tsem_wait(&pFS->canEdit); tsem_wait(&pFS->canEdit);