more code

This commit is contained in:
Hongze Cheng 2023-05-09 16:24:18 +08:00
parent 023281ae32
commit 61eb991eca
3 changed files with 55 additions and 34 deletions

View File

@ -32,8 +32,8 @@ typedef enum {
/* Exposed APIs */ /* Exposed APIs */
// open/close // open/close
int32_t tsdbOpenFileSystem(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback); int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback);
int32_t tsdbCloseFileSystem(STFileSystem **ppFS); int32_t tsdbCloseFS(STFileSystem **ppFS);
// txn // txn
int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype); int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype);
int32_t tsdbFSEditCommit(STFileSystem *pFS, EFEditT etype); int32_t tsdbFSEditCommit(STFileSystem *pFS, EFEditT etype);

View File

@ -273,15 +273,20 @@ _exit:
return code; return code;
} }
static int32_t commit_edit(STFileSystem *pFS) { static int32_t apply_commit(STFileSystem *fs) {
// TODO
return 0;
}
static int32_t commit_edit(STFileSystem *fs) {
char current[TSDB_FILENAME_LEN]; char current[TSDB_FILENAME_LEN];
char current_t[TSDB_FILENAME_LEN]; char current_t[TSDB_FILENAME_LEN];
current_fname(pFS->pTsdb, current, TSDB_FCURRENT); current_fname(fs->pTsdb, current, TSDB_FCURRENT);
if (pFS->etype == TSDB_FEDIT_COMMIT) { if (fs->etype == TSDB_FEDIT_COMMIT) {
current_fname(pFS->pTsdb, current, TSDB_FCURRENT_C); current_fname(fs->pTsdb, current, TSDB_FCURRENT_C);
} else if (pFS->etype == TSDB_FEDIT_MERGE) { } else if (fs->etype == TSDB_FEDIT_MERGE) {
current_fname(pFS->pTsdb, current, TSDB_FCURRENT_M); current_fname(fs->pTsdb, current, TSDB_FCURRENT_M);
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -292,35 +297,54 @@ static int32_t commit_edit(STFileSystem *pFS) {
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit) TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit)
} }
code = apply_commit(fs);
TSDB_CHECK_CODE(code, lino, _exit)
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pFS->pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
static int32_t abort_edit(STFileSystem *pFS) {
char fname[TSDB_FILENAME_LEN];
if (pFS->etype == TSDB_FEDIT_COMMIT) {
current_fname(pFS->pTsdb, fname, TSDB_FCURRENT_C);
} else if (pFS->etype == TSDB_FEDIT_MERGE) {
current_fname(pFS->pTsdb, fname, TSDB_FCURRENT_M);
} else { } else {
ASSERT(0); tsdbInfo("vgId:%d %s success, eid:%" PRId64 " etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, fs->eid, fs->etype);
} }
int32_t code = taosRemoveFile(fname);
if (code) code = TAOS_SYSTEM_ERROR(code);
return code; return code;
} }
static int32_t scan_file_system(STFileSystem *pFS) { // static int32_t
static int32_t apply_abort(STFileSystem *fs) {
// TODO // TODO
return 0; return 0;
} }
static int32_t scan_and_schedule_merge(STFileSystem *pFS) { static int32_t abort_edit(STFileSystem *fs) {
char fname[TSDB_FILENAME_LEN];
if (fs->etype == TSDB_FEDIT_COMMIT) {
current_fname(fs->pTsdb, fname, TSDB_FCURRENT_C);
} else if (fs->etype == TSDB_FEDIT_MERGE) {
current_fname(fs->pTsdb, fname, TSDB_FCURRENT_M);
} else {
ASSERT(0);
}
int32_t code;
int32_t lino;
if ((code = taosRemoveFile(fname))) {
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit)
}
code = apply_abort(fs);
TSDB_CHECK_CODE(code, lino, _exit)
_exit:
if (code) {
tsdbError("vgId:%d %s failed since %s", TD_VID(fs->pTsdb->pVnode), __func__, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s success, eid:%" PRId64 " etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, fs->eid, fs->etype);
}
return code;
}
static int32_t scan_and_fix_fs(STFileSystem *pFS) {
// TODO // TODO
return 0; return 0;
} }
@ -371,10 +395,7 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
code = scan_file_system(fs); code = scan_and_fix_fs(fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = scan_and_schedule_merge(fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} else { } else {
code = save_fs(0, fs->nstate, fCurrent); code = save_fs(0, fs->nstate, fCurrent);
@ -463,7 +484,7 @@ _exit:
return 0; return 0;
} }
int32_t tsdbOpenFileSystem(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback) { int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback) {
int32_t code; int32_t code;
int32_t lino; int32_t lino;
@ -483,7 +504,7 @@ _exit:
return 0; return 0;
} }
int32_t tsdbCloseFileSystem(STFileSystem **ppFS) { int32_t tsdbCloseFS(STFileSystem **ppFS) {
if (ppFS[0] == NULL) return 0; if (ppFS[0] == NULL) return 0;
close_file_system(ppFS[0]); close_file_system(ppFS[0]);
destroy_fs(ppFS); destroy_fs(ppFS);

View File

@ -70,7 +70,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
// open tsdb // open tsdb
#ifdef USE_DEV_CODE #ifdef USE_DEV_CODE
if (tsdbOpenFileSystem(pTsdb, &pTsdb->pFS, rollback) < 0) { if (tsdbOpenFS(pTsdb, &pTsdb->pFS, rollback) < 0) {
goto _err; goto _err;
} }
#else #else
@ -106,7 +106,7 @@ int tsdbClose(STsdb **pTsdb) {
#ifndef USE_DEV_CODE #ifndef USE_DEV_CODE
tsdbFSClose(*pTsdb); tsdbFSClose(*pTsdb);
#else #else
tsdbCloseFileSystem(&(*pTsdb)->pFS); tsdbCloseFS(&(*pTsdb)->pFS);
#endif #endif
tsdbCloseCache(*pTsdb); tsdbCloseCache(*pTsdb);
taosMemoryFreeClear(*pTsdb); taosMemoryFreeClear(*pTsdb);