more code
This commit is contained in:
parent
dae0cc50b3
commit
afbaeb0d2d
|
@ -28,6 +28,8 @@ extern "C" {
|
||||||
|
|
||||||
#include "tsdbFileOp.h"
|
#include "tsdbFileOp.h"
|
||||||
|
|
||||||
|
#include "tsdbFSet.h"
|
||||||
|
|
||||||
#include "tsdbFS.h"
|
#include "tsdbFS.h"
|
||||||
|
|
||||||
#include "tsdbSttFWriter.h"
|
#include "tsdbSttFWriter.h"
|
||||||
|
|
|
@ -16,28 +16,36 @@
|
||||||
#include "dev.h"
|
#include "dev.h"
|
||||||
|
|
||||||
static int32_t create_file_system(STsdb *pTsdb, struct STFileSystem **ppFS) {
|
static int32_t create_file_system(STsdb *pTsdb, struct STFileSystem **ppFS) {
|
||||||
ppFS[0] = taosMemoryCalloc(1, sizeof(*ppFS[0]));
|
if ((ppFS[0] = taosMemoryCalloc(1, sizeof(*ppFS[0]))) == NULL) {
|
||||||
if (ppFS[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((ppFS[0]->aFileSet = taosArrayInit(16, sizeof(struct SFileSet))) == NULL) {
|
||||||
|
taosMemoryFree(ppFS[0]);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
ppFS[0]->pTsdb = pTsdb;
|
ppFS[0]->pTsdb = pTsdb;
|
||||||
|
tsem_init(&ppFS[0]->canEdit, 0, 1);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t destroy_file_system(struct STFileSystem **ppFS) {
|
static int32_t destroy_file_system(struct STFileSystem **ppFS) {
|
||||||
if (ppFS[0]) {
|
if (ppFS[0]) {
|
||||||
taosMemoryFree(ppFS[0]->aFileSet);
|
taosArrayDestroy(ppFS[0]->aFileSet);
|
||||||
|
tsem_destroy(&ppFS[0]->canEdit);
|
||||||
taosMemoryFree(ppFS[0]);
|
taosMemoryFree(ppFS[0]);
|
||||||
ppFS[0] = NULL;
|
ppFS[0] = NULL;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t get_current_file_name(STsdb *pTsdb, char fname[]) {
|
static int32_t get_current_json(STsdb *pTsdb, char fname[]) {
|
||||||
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json");
|
return snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json"), 0;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t get_temp_current_file_name(STsdb *pTsdb, char fname[], EFsEditType etype) {
|
static int32_t get_current_temp(STsdb *pTsdb, char fname[], EFsEditType etype) {
|
||||||
switch (etype) {
|
switch (etype) {
|
||||||
case TSDB_FS_EDIT_COMMIT:
|
case TSDB_FS_EDIT_COMMIT:
|
||||||
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json.commit");
|
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json.commit");
|
||||||
|
@ -50,23 +58,97 @@ static int32_t get_temp_current_file_name(STsdb *pTsdb, char fname[], EFsEditTyp
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t save_fs_to_file(struct STFileSystem *pFS, const char *fname) {
|
||||||
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t load_fs_from_file(const char *fname, struct STFileSystem *pFS) {
|
||||||
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t commit_edit(struct STFileSystem *pFS, EFsEditType etype) {
|
||||||
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t abort_edit(struct STFileSystem *pFS, EFsEditType etype) {
|
||||||
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t open_file_system(struct STFileSystem *pFS, int8_t rollback) {
|
static int32_t open_file_system(struct STFileSystem *pFS, int8_t rollback) {
|
||||||
// TODO
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
STsdb *pTsdb = pFS->pTsdb;
|
||||||
|
|
||||||
|
if (0) {
|
||||||
|
ASSERTS(0, "Not implemented yet");
|
||||||
|
} else {
|
||||||
|
char current_json[TSDB_FILENAME_LEN];
|
||||||
|
char current_json_commit[TSDB_FILENAME_LEN];
|
||||||
|
char current_json_t[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
get_current_json(pTsdb, current_json);
|
||||||
|
get_current_temp(pTsdb, current_json_commit, TSDB_FS_EDIT_COMMIT);
|
||||||
|
get_current_temp(pTsdb, current_json_t, TSDB_FS_EDIT_MERGE);
|
||||||
|
|
||||||
|
if (taosCheckExistFile(current_json)) { // current.json exists
|
||||||
|
code = load_fs_from_file(current_json, pFS);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
// check current.json.commit existence
|
||||||
|
if (taosCheckExistFile(current_json_commit)) {
|
||||||
|
if (rollback) {
|
||||||
|
code = commit_edit(pFS, TSDB_FS_EDIT_COMMIT);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
} else {
|
||||||
|
code = abort_edit(pFS, TSDB_FS_EDIT_COMMIT);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERTS(1, "Do nothing");
|
||||||
|
}
|
||||||
|
|
||||||
|
// check current.json.t existence
|
||||||
|
if (taosCheckExistFile(current_json_t)) {
|
||||||
|
code = abort_edit(pFS, TSDB_FS_EDIT_MERGE);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
code = save_fs_to_file(pFS, current_json);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d %s failed at line %d since %s", //
|
||||||
|
TD_VID(pTsdb->pVnode), //
|
||||||
|
__func__, //
|
||||||
|
lino, //
|
||||||
|
tstrerror(code));
|
||||||
|
} else {
|
||||||
|
tsdbInfo("vgId:%d %s success", //
|
||||||
|
TD_VID(pTsdb->pVnode), //
|
||||||
|
__func__);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t close_file_system(struct STFileSystem *pFS) {
|
static int32_t close_file_system(struct STFileSystem *pFS) {
|
||||||
// TODO
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t write_fs_to_file(struct STFileSystem *pFS, const char *fname) {
|
static int32_t write_fs_to_file(struct STFileSystem *pFS, const char *fname) {
|
||||||
// TODO
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t read_fs_from_file(struct STFileSystem *pFS, const char *fname) {
|
static int32_t read_fs_from_file(struct STFileSystem *pFS, const char *fname) {
|
||||||
// TODO
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,14 +160,22 @@ int32_t tsdbOpenFileSystem(STsdb *pTsdb, struct STFileSystem **ppFS, int8_t roll
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = open_file_system(ppFS[0], rollback);
|
code = open_file_system(ppFS[0], rollback);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
if (code) {
|
||||||
|
destroy_file_system(ppFS);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at line %d since %s", //
|
||||||
if (ppFS[0]) destroy_file_system(ppFS);
|
TD_VID(pTsdb->pVnode), //
|
||||||
|
__func__, //
|
||||||
|
lino, //
|
||||||
|
tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
tsdbInfo("vgId:%d %s success", //
|
||||||
|
TD_VID(pTsdb->pVnode), //
|
||||||
|
__func__);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -103,7 +193,7 @@ int32_t tsdbFileSystemEditBegin(struct STFileSystem *pFS, const SArray *aFileOp,
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
get_temp_current_file_name(pFS->pTsdb, fname, etype);
|
get_current_temp(pFS->pTsdb, fname, etype);
|
||||||
|
|
||||||
tsem_wait(&pFS->canEdit);
|
tsem_wait(&pFS->canEdit);
|
||||||
|
|
||||||
|
@ -127,8 +217,8 @@ int32_t tsdbFileSystemEditCommit(struct STFileSystem *pFS, EFsEditType etype) {
|
||||||
char ofname[TSDB_FILENAME_LEN];
|
char ofname[TSDB_FILENAME_LEN];
|
||||||
char nfname[TSDB_FILENAME_LEN];
|
char nfname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
get_current_file_name(pFS->pTsdb, nfname);
|
get_current_json(pFS->pTsdb, nfname);
|
||||||
get_temp_current_file_name(pFS->pTsdb, ofname, etype);
|
get_current_temp(pFS->pTsdb, ofname, etype);
|
||||||
|
|
||||||
code = taosRenameFile(ofname, nfname);
|
code = taosRenameFile(ofname, nfname);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -153,7 +243,7 @@ int32_t tsdbFileSystemEditAbort(struct STFileSystem *pFS, EFsEditType etype) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
get_temp_current_file_name(pFS->pTsdb, fname, etype);
|
get_current_temp(pFS->pTsdb, fname, etype);
|
||||||
|
|
||||||
code = taosRemoveFile(fname);
|
code = taosRemoveFile(fname);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -41,10 +41,9 @@ int32_t tsdbFileSystemEditAbort(struct STFileSystem *pFS, EFsEditType etype);
|
||||||
|
|
||||||
/* Exposed Structs */
|
/* Exposed Structs */
|
||||||
struct STFileSystem {
|
struct STFileSystem {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
tsem_t canEdit;
|
tsem_t canEdit;
|
||||||
int32_t nFileSet;
|
SArray *aFileSet; // SArray<struct SFileSet>
|
||||||
struct SFileSet *aFileSet;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -290,7 +290,7 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbLoadFSFromFile(const char *fname, STsdbFS *pFS) {
|
static int32_t load_fs_from_file(const char *fname, STsdbFS *pFS) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
uint8_t *pData = NULL;
|
uint8_t *pData = NULL;
|
||||||
|
@ -724,7 +724,7 @@ int32_t tsdbFSCommit(STsdb *pTsdb) {
|
||||||
code = tsdbFSCreate(&fs);
|
code = tsdbFSCreate(&fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = tsdbLoadFSFromFile(current, &fs);
|
code = load_fs_from_file(current, &fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// apply file change
|
// apply file change
|
||||||
|
@ -769,7 +769,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
|
||||||
tsdbGetCurrentFName(pTsdb, current, current_t);
|
tsdbGetCurrentFName(pTsdb, current, current_t);
|
||||||
|
|
||||||
if (taosCheckExistFile(current)) {
|
if (taosCheckExistFile(current)) {
|
||||||
code = tsdbLoadFSFromFile(current, &pTsdb->fs);
|
code = load_fs_from_file(current, &pTsdb->fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (taosCheckExistFile(current_t)) {
|
if (taosCheckExistFile(current_t)) {
|
||||||
|
|
Loading…
Reference in New Issue