diff --git a/cmake/cmake.options b/cmake/cmake.options index 7c208cd5d2..06754433ac 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -178,7 +178,6 @@ option( "If use dev code" ON ) - if (${USE_DEV_CODE}) add_definitions(-DUSE_DEV_CODE) endif(USE_DEV_CODE) \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h index 34fdc4bf24..d6ba9e61b6 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFS.h @@ -26,26 +26,28 @@ extern "C" { typedef struct STFileSystem STFileSystem; typedef enum { - TSDB_FS_EDIT_COMMIT = 1, // - TSDB_FS_EDIT_MERGE -} tsdb_fs_edit_t; + TSDB_FEDIT_COMMIT = 1, // + TSDB_FEDIT_MERGE +} EFEditT; /* Exposed APIs */ // open/close int32_t tsdbOpenFileSystem(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback); int32_t tsdbCloseFileSystem(STFileSystem **ppFS); // txn -int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype); -int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype); -int32_t tsdbFileSystemEditAbort(STFileSystem *pFS, tsdb_fs_edit_t etype); +int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype); +int32_t tsdbFSEditCommit(STFileSystem *pFS, EFEditT etype); +int32_t tsdbFSEditAbort(STFileSystem *pFS, EFEditT etype); /* Exposed Structs */ struct STFileSystem { STsdb *pTsdb; int32_t state; tsem_t canEdit; - int64_t nextEditId; + int64_t neid; SArray *cstate; // current state, SArray + EFEditT etype; + int64_t eid; SArray *nstate; // next state, SArray }; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 0a627b394c..cbde3bae4a 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -357,11 +357,11 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) { int32_t lino; if (eno == 0) { - TSDB_CHECK_CODE( // - code = tsdbFileSystemEditBegin( // - pCommiter->pTsdb->pFS, // - pCommiter->aFileOp, // - TSDB_FS_EDIT_COMMIT), + TSDB_CHECK_CODE( // + code = tsdbFSEditBegin( // + pCommiter->pTsdb->pFS, // + pCommiter->aFileOp, // + TSDB_FEDIT_COMMIT), lino, // _exit); } else { @@ -448,8 +448,8 @@ int32_t tsdbCommitCommit(STsdb *pTsdb) { // lock taosThreadRwlockWrlock(&pTsdb->rwLock); - code = tsdbFileSystemEditCommit(pTsdb->pFS, // - TSDB_FS_EDIT_COMMIT); + code = tsdbFSEditCommit(pTsdb->pFS, // + TSDB_FEDIT_COMMIT); if (code) { taosThreadRwlockUnlock(&pTsdb->rwLock); TSDB_CHECK_CODE(code, lino, _exit); @@ -481,8 +481,8 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) { int32_t code = 0; int32_t lino = 0; - code = tsdbFileSystemEditAbort(pTsdb->pFS, // - TSDB_FS_EDIT_COMMIT); + code = tsdbFSEditAbort(pTsdb->pFS, // + TSDB_FEDIT_COMMIT); TSDB_CHECK_CODE(code, lino, _exit); _exit: diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index f1ee05ce48..381c6465bc 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -15,8 +15,8 @@ #include "inc/tsdbFS.h" -#define TSDB_FS_EDIT_MIN TSDB_FS_EDIT_COMMIT -#define TSDB_FS_EDIT_MAX (TSDB_FS_EDIT_MERGE + 1) +#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT +#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) enum { TSDB_FS_STATE_NONE = 0, @@ -33,8 +33,8 @@ typedef enum { static const char *gCurrentFname[] = { [TSDB_FCURRENT] = "current.json", - [TSDB_FCURRENT_C] = "current.json.0", - [TSDB_FCURRENT_M] = "current.json.1", + [TSDB_FCURRENT_C] = "current.c.json", + [TSDB_FCURRENT_M] = "current.m.json", }; static int32_t create_fs(STsdb *pTsdb, STFileSystem **ppFS) { @@ -55,7 +55,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **ppFS) { ppFS[0]->pTsdb = pTsdb; ppFS[0]->state = TSDB_FS_STATE_NONE; tsem_init(&ppFS[0]->canEdit, 0, 1); - ppFS[0]->nextEditId = 0; + ppFS[0]->neid = 0; return 0; } @@ -194,17 +194,17 @@ static int32_t save_fs(int64_t eid, SArray *aTFileSet, const char *fname) { } for (int32_t i = 0; i < taosArrayGetSize(aTFileSet); i++) { STFileSet *pFileSet = (STFileSet *)taosArrayGet(aTFileSet, i); + cJSON *item; - cJSON *tjson = cJSON_CreateObject(); - if (tjson == NULL) { + if ((item = cJSON_CreateObject()) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit) } - code = tsdbFileSetToJson(pFileSet, tjson); + code = tsdbFileSetToJson(pFileSet, item); TSDB_CHECK_CODE(code, lino, _exit); - cJSON_AddItemToArray(ajson, tjson); + cJSON_AddItemToArray(ajson, item); } code = save_json(json, fname); @@ -237,8 +237,7 @@ static int32_t load_fs(const char *fname, SArray *aTFileSet, int64_t *eid) { if (cJSON_IsNumber(item)) { ASSERT(item->valuedouble == 1); } else { - code = TSDB_CODE_FILE_CORRUPTED; - TSDB_CHECK_CODE(code, lino, _exit) + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit) } /* eid */ @@ -246,8 +245,7 @@ static int32_t load_fs(const char *fname, SArray *aTFileSet, int64_t *eid) { if (cJSON_IsNumber(item)) { eid[0] = item->valuedouble; } else { - code = TSDB_CODE_FILE_CORRUPTED; - TSDB_CHECK_CODE(code, lino, _exit) + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit) } /* fset */ @@ -255,18 +253,16 @@ static int32_t load_fs(const char *fname, SArray *aTFileSet, int64_t *eid) { if (cJSON_IsArray(item)) { const cJSON *titem; cJSON_ArrayForEach(titem, item) { - STFileSet *pFileSet = taosArrayReserve(aTFileSet, 1); - if (pFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); + STFileSet *pFileSet; + if ((pFileSet = taosArrayReserve(aTFileSet, 1)) == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); } code = tsdbFileSetFromJson(titem, pFileSet); TSDB_CHECK_CODE(code, lino, _exit) } } else { - code = TSDB_CODE_FILE_CORRUPTED; - TSDB_CHECK_CODE(code, lino, _exit) + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit) } _exit: @@ -277,44 +273,55 @@ _exit: return code; } -static int32_t commit_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) { - int32_t code; - char ofname[TSDB_FILENAME_LEN]; - char nfname[TSDB_FILENAME_LEN]; +static int32_t commit_edit(STFileSystem *pFS) { + char current[TSDB_FILENAME_LEN]; + char current_t[TSDB_FILENAME_LEN]; - current_fname(pFS->pTsdb, nfname, TSDB_FCURRENT); - current_fname(pFS->pTsdb, ofname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M); - - code = taosRenameFile(ofname, nfname); - if (code) { - code = TAOS_SYSTEM_ERROR(code); - return code; + current_fname(pFS->pTsdb, current, TSDB_FCURRENT); + if (pFS->etype == TSDB_FEDIT_COMMIT) { + current_fname(pFS->pTsdb, current, TSDB_FCURRENT_C); + } else if (pFS->etype == TSDB_FEDIT_MERGE) { + current_fname(pFS->pTsdb, current, TSDB_FCURRENT_M); + } else { + ASSERT(0); } - ASSERTS(0, "TODO: Do changes to pFS"); + int32_t code; + int32_t lino; + if ((code = taosRenameFile(current_t, current))) { + TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit) + } - return 0; +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pFS->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; } -static int32_t abort_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) { - int32_t code; - char fname[TSDB_FILENAME_LEN]; +static int32_t abort_edit(STFileSystem *pFS) { + char fname[TSDB_FILENAME_LEN]; - current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M); + if (pFS->etype == TSDB_FEDIT_COMMIT) { + current_fname(pFS->pTsdb, fname, TSDB_FCURRENT_C); + } else if (pFS->etype == TSDB_FEDIT_MERGE) { + current_fname(pFS->pTsdb, fname, TSDB_FCURRENT_M); + } else { + ASSERT(0); + } - code = taosRemoveFile(fname); + int32_t code = taosRemoveFile(fname); if (code) code = TAOS_SYSTEM_ERROR(code); - return code; } static int32_t scan_file_system(STFileSystem *pFS) { - // ASSERTS(0, "TODO: Not implemented yet"); + // TODO return 0; } static int32_t scan_and_schedule_merge(STFileSystem *pFS) { - // ASSERTS(0, "TODO: Not implemented yet"); + // TODO return 0; } @@ -323,12 +330,12 @@ static int32_t update_fs_if_needed(STFileSystem *pFS) { return 0; } -static int32_t open_fs(STFileSystem *pFS, int8_t rollback) { +static int32_t open_fs(STFileSystem *fs, int8_t rollback) { int32_t code = 0; int32_t lino = 0; - STsdb *pTsdb = pFS->pTsdb; + STsdb *pTsdb = fs->pTsdb; - code = update_fs_if_needed(pFS); + code = update_fs_if_needed(fs); TSDB_CHECK_CODE(code, lino, _exit) char fCurrent[TSDB_FILENAME_LEN]; @@ -340,33 +347,37 @@ static int32_t open_fs(STFileSystem *pFS, int8_t rollback) { current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M); if (taosCheckExistFile(fCurrent)) { // current.json exists - code = load_fs(fCurrent, pFS->cstate, &pFS->nextEditId); + code = load_fs(fCurrent, fs->cstate, &fs->neid); TSDB_CHECK_CODE(code, lino, _exit); - // check current.json.commit existence if (taosCheckExistFile(cCurrent)) { + // current.c.json exists + + fs->etype = TSDB_FEDIT_COMMIT; if (rollback) { - code = commit_edit(pFS, TSDB_FS_EDIT_COMMIT); + code = abort_edit(fs); TSDB_CHECK_CODE(code, lino, _exit); } else { - code = abort_edit(pFS, TSDB_FS_EDIT_COMMIT); + code = load_fs(cCurrent, fs->nstate, &fs->eid); + TSDB_CHECK_CODE(code, lino, _exit) + + code = commit_edit(fs); TSDB_CHECK_CODE(code, lino, _exit); } - } - - // check current.json.t existence - if (taosCheckExistFile(mCurrent)) { - code = abort_edit(pFS, TSDB_FS_EDIT_MERGE); + } else if (taosCheckExistFile(mCurrent)) { + // current.m.json exists + fs->etype = TSDB_FEDIT_MERGE; + code = abort_edit(fs); TSDB_CHECK_CODE(code, lino, _exit); } - code = scan_file_system(pFS); + code = scan_file_system(fs); TSDB_CHECK_CODE(code, lino, _exit); - code = scan_and_schedule_merge(pFS); + code = scan_and_schedule_merge(fs); TSDB_CHECK_CODE(code, lino, _exit); } else { - code = save_fs(0, pFS->nstate, fCurrent); + code = save_fs(0, fs->nstate, fCurrent); TSDB_CHECK_CODE(code, lino, _exit); } @@ -479,12 +490,12 @@ int32_t tsdbCloseFileSystem(STFileSystem **ppFS) { return 0; } -int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype) { +int32_t tsdbFSEditBegin(STFileSystem *pFS, const SArray *aFileOp, EFEditT etype) { int32_t code = 0; int32_t lino; char fname[TSDB_FILENAME_LEN]; - // current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M); + // current_fname(pFS->pTsdb, fname, etype == TSDB_FEDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M); // tsem_wait(&pFS->canEdit); @@ -514,8 +525,8 @@ _exit: return code; } -int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype) { - int32_t code = commit_edit(pFS, etype); +int32_t tsdbFSEditCommit(STFileSystem *pFS, EFEditT etype) { + int32_t code = commit_edit(pFS); tsem_post(&pFS->canEdit); if (code) { tsdbError("vgId:%d %s failed since %s", // @@ -531,8 +542,8 @@ int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype) { return code; } -int32_t tsdbFileSystemEditAbort(STFileSystem *pFS, tsdb_fs_edit_t etype) { - int32_t code = abort_edit(pFS, etype); +int32_t tsdbFSEditAbort(STFileSystem *pFS, EFEditT etype) { + int32_t code = abort_edit(pFS); if (code) { tsdbError("vgId:%d %s failed since %s, etype:%d", // TD_VID(pFS->pTsdb->pVnode), // diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 8bc892ef12..03c44cbcc6 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -69,14 +69,14 @@ static int32_t tsdbCloseMerger(SMerger *pMerger) { STsdb *pTsdb = pMerger->pTsdb; - code = tsdbFileSystemEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FS_EDIT_MERGE); + code = tsdbFSEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FEDIT_MERGE); TSDB_CHECK_CODE(code, lino, _exit) _exit: if (code) { - tsdbFileSystemEditAbort(pTsdb->pFS, TSDB_FS_EDIT_MERGE); + tsdbFSEditAbort(pTsdb->pFS, TSDB_FEDIT_MERGE); } else { - tsdbFileSystemEditCommit(pTsdb->pFS, TSDB_FS_EDIT_MERGE); + tsdbFSEditCommit(pTsdb->pFS, TSDB_FEDIT_MERGE); } tsdbDestroyMerger(pMerger); return code;