more code

This commit is contained in:
Hongze Cheng 2023-05-08 16:16:45 +08:00
parent 198fe28967
commit ebfd8b03a3
6 changed files with 243 additions and 162 deletions

View File

@ -33,9 +33,10 @@ typedef enum {
TSDB_FOP_TRUNCATE,
} tsdb_fop_t;
int32_t tsdbFileSetToJson(const STFileSet *fset, cJSON *json);
int32_t tsdbFileSetCreate(int32_t fid, STFileSet **ppSet);
int32_t tsdbFileSetEdit(STFileSet *pSet, SFileOp *pOp);
int32_t tsdbFileSetToJson(SJson *pJson, const STFileSet *pSet);
int32_t tsdbEditFileSet(STFileSet *pFileSet, const SFileOp *pOp);
struct SFileOp {
@ -48,8 +49,8 @@ struct SFileOp {
typedef struct SSttLvl {
LISTD(struct SSttLvl) listNode;
int32_t lvl; // level
int32_t nStt; // number of .stt files on this level
STFile *fStt; // .stt files
int32_t nstt; // number of .stt files on this level
STFile *fstt; // .stt files
} SSttLvl;
struct STFileSet {

View File

@ -32,22 +32,25 @@ typedef enum {
TSDB_FTYPE_STT = TSDB_FTYPE_TOMB + 2, // .stt
} tsdb_ftype_t;
#define TSDB_FTYPE_MIN TSDB_FTYPE_HEAD
#define TSDB_FTYPE_MAX (TSDB_FTYPE_TOMB + 1)
int32_t tsdbTFileToJson(const STFile *f, cJSON *json);
int32_t tsdbTFileInit(STsdb *pTsdb, STFile *pFile);
int32_t tsdbTFileClear(STFile *pFile);
struct STFile {
LISTD(STFile) listNode;
char fname[TSDB_FILENAME_LEN];
int32_t ref;
int32_t state;
char fname[TSDB_FILENAME_LEN];
int32_t ref;
int32_t state;
tsdb_ftype_t type;
SDiskID did;
int64_t size;
int64_t cid;
int32_t fid;
int64_t cid;
int64_t size;
union {
struct {
int32_t lvl;

View File

@ -27,8 +27,8 @@ enum {
typedef enum {
TSDB_FCURRENT = 1,
TSDB_FCURRENT_C,
TSDB_FCURRENT_M,
TSDB_FCURRENT_C, // for commit
TSDB_FCURRENT_M, // for merge
} EFCurrentT;
static const char *gCurrentFname[] = {
@ -91,69 +91,6 @@ static int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
return 0;
}
static int32_t fs_to_json_str(STFileSystem *pFS, char **ppData) {
int32_t code = 0;
int32_t lino;
cJSON *pJson = cJSON_CreateObject();
if (pJson == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
/* format version */
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"version", //
1 /* TODO */),
code, //
lino, //
_exit, //
TSDB_CODE_OUT_OF_MEMORY);
/* next edit id */
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"edit id", //
pFS->nextEditId),
code, //
lino, //
_exit, //
TSDB_CODE_OUT_OF_MEMORY);
/* file sets */
cJSON *aFileSetJson;
TSDB_CHECK_NULL( //
aFileSetJson = cJSON_AddArrayToObject(pJson, "file sets"), //
code, //
lino, //
_exit, //
TSDB_CODE_OUT_OF_MEMORY);
for (int32_t i = 0; i < taosArrayGetSize(pFS->nstate); i++) {
struct STFileSet *pFileSet = taosArrayGet(pFS->nstate, i);
code = tsdbFileSetToJson(aFileSetJson, pFileSet);
TSDB_CHECK_CODE(code, lino, _exit);
}
ppData[0] = cJSON_Print(pJson);
if (ppData[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
cJSON_Delete(pJson);
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", //
TD_VID(pFS->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
}
return code;
}
static int32_t fs_from_json_str(const char *pData, STFileSystem *pFS) {
int32_t code = 0;
int32_t lino;
@ -164,58 +101,87 @@ _exit:
return code;
}
static int32_t save_fs_to_file(STFileSystem *pFS, const char *fname) {
int32_t code = 0;
int32_t lino;
char *pData = NULL;
static int32_t save_json(const cJSON *json, const char *fname) {
int32_t code;
// to json string
code = fs_to_json_str(pFS, &pData);
TSDB_CHECK_CODE(code, lino, _exit);
char *data = cJSON_Print(json);
if (data == NULL) return TSDB_CODE_OUT_OF_MEMORY;
TdFilePtr fd = taosOpenFile(fname, //
TD_FILE_WRITE //
| TD_FILE_CREATE //
| TD_FILE_TRUNC);
if (fd == NULL) {
TdFilePtr fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
if (fp == NULL) {
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
int64_t n = taosWriteFile(fd, pData, strlen(pData) + 1);
if (n < 0) {
if (taosWriteFile(fp, data, strlen(data) + 1) < 0) {
code = TAOS_SYSTEM_ERROR(code);
taosCloseFile(&fd);
TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
if (taosFsyncFile(fd) < 0) {
if (taosFsyncFile(fp) < 0) {
code = TAOS_SYSTEM_ERROR(code);
taosCloseFile(&fd);
TSDB_CHECK_CODE(code, lino, _exit);
goto _exit;
}
taosCloseFile(&fd);
taosCloseFile(&fp);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", //
TD_VID(pFS->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else {
tsdbDebug("vgId:%d %s success", //
TD_VID(pFS->pTsdb->pVnode), //
__func__);
}
if (pData) {
taosMemoryFree(pData);
}
taosMemoryFree(data);
return code;
}
static int32_t load_fs_from_file(const char *fname, STFileSystem *pFS) {
static int32_t save_fs(int64_t eid, SArray *aTFileSet, const char *fname) {
int32_t code = 0;
int32_t lino = 0;
cJSON *json = cJSON_CreateObject();
if (json == NULL) return TSDB_CODE_OUT_OF_MEMORY;
// fmtv
if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit)
}
// eid
if (cJSON_AddNumberToObject(json, "eid", eid) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit)
}
// fset
cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
if (ajson == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit)
}
for (int32_t i = 0; i < taosArrayGetSize(aTFileSet); i++) {
STFileSet *pFileSet = (STFileSet *)taosArrayGet(aTFileSet, i);
cJSON *tjson = cJSON_CreateObject();
if (tjson == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit)
}
code = tsdbFileSetToJson(pFileSet, tjson);
TSDB_CHECK_CODE(code, lino, _exit);
cJSON_AddItemToArray(ajson, tjson);
}
code = save_json(json, fname);
TSDB_CHECK_CODE(code, lino, _exit)
_exit:
if (code) {
tsdbError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
cJSON_Delete(json);
return code;
}
static int32_t load_fs(const char *fname, STFileSystem *pFS) {
ASSERTS(0, "TODO: Not implemented yet");
return 0;
}
@ -280,10 +246,10 @@ static int32_t open_fs(STFileSystem *pFS, int8_t rollback) {
current_fname(pTsdb, fCurrent, TSDB_FCURRENT);
current_fname(pTsdb, cCurrent, TSDB_FCURRENT_C);
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_C);
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
if (taosCheckExistFile(fCurrent)) { // current.json exists
code = load_fs_from_file(fCurrent, pFS);
code = load_fs(fCurrent, pFS);
TSDB_CHECK_CODE(code, lino, _exit);
// check current.json.commit existence
@ -302,17 +268,17 @@ static int32_t open_fs(STFileSystem *pFS, int8_t rollback) {
code = abort_edit(pFS, TSDB_FS_EDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = scan_file_system(pFS);
TSDB_CHECK_CODE(code, lino, _exit);
code = scan_and_schedule_merge(pFS);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = save_fs_to_file(pFS, fCurrent);
code = save_fs(0, pFS->nstate, fCurrent);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = scan_file_system(pFS);
TSDB_CHECK_CODE(code, lino, _exit);
code = scan_and_schedule_merge(pFS);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -427,19 +393,19 @@ int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_f
int32_t lino;
char fname[TSDB_FILENAME_LEN];
current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
// current_fname(pFS->pTsdb, fname, etype == TSDB_FS_EDIT_COMMIT ? TSDB_FCURRENT_C : TSDB_FCURRENT_M);
tsem_wait(&pFS->canEdit);
// tsem_wait(&pFS->canEdit);
TSDB_CHECK_CODE( //
code = edit_fs(pFS, aFileOp), //
lino, //
_exit);
// TSDB_CHECK_CODE( //
// code = edit_fs(pFS, aFileOp), //
// lino, //
// _exit);
TSDB_CHECK_CODE( //
code = save_fs_to_file(pFS, fname), //
lino, //
_exit);
// TSDB_CHECK_CODE( //
// code = save_fs(pFS, fname), //
// lino, //
// _exit);
_exit:
if (code) {

View File

@ -15,6 +15,29 @@
#include "inc/tsdbFSet.h"
static int32_t stt_lvl_to_json(const SSttLvl *lvl, cJSON *json) {
if (cJSON_AddNumberToObject(json, "lvl", lvl->lvl) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
cJSON *arr = cJSON_AddArrayToObject(json, "stt");
if (arr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
// TODO: .stt files
// STFile *f;
// LISTD_FOREACH(&lvl->fstt, f, listNode) {
// cJSON *item = cJSON_CreateObject();
// if (item == NULL) return TSDB_CODE_OUT_OF_MEMORY;
// int32_t code = tsdbTFileToJson(f, item);
// if (code) return code;
// cJSON_AddItemToArray(arr, item);
// }
return 0;
}
int32_t tsdbFileSetCreate(int32_t fid, struct STFileSet **ppSet) {
int32_t code = 0;
@ -36,13 +59,39 @@ int32_t tsdbFileSetEdit(struct STFileSet *pSet, struct SFileOp *pOp) {
return code;
}
int32_t tsdbFileSetToJson(SJson *pJson, const struct STFileSet *pSet) {
int32_t tsdbFileSetToJson(const STFileSet *fset, cJSON *json) {
int32_t code = 0;
ASSERTS(0, "TODO: Not implemented yet");
// fid
if (cJSON_AddNumberToObject(json, "fid", fset->fid) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
_exit:
return code;
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset->farr[ftype] == NULL) {
continue;
}
code = tsdbTFileToJson(fset->farr[ftype], json);
if (code) return code;
}
// each level
cJSON *ajson = cJSON_AddArrayToObject(json, "stt");
if (ajson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
SSttLvl *sttLvl;
LISTD_FOREACH(&fset->lvl0, sttLvl, listNode) {
cJSON *ljson = cJSON_CreateObject();
if (ljson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
code = stt_lvl_to_json(sttLvl, ljson);
if (code) return code;
cJSON_AddItemToArray(ajson, ljson);
}
return 0;
}
int32_t tsdbEditFileSet(struct STFileSet *pFileSet, const struct SFileOp *pOp) {

View File

@ -15,41 +15,96 @@
#include "inc/tsdbFile.h"
const char *tsdb_ftype_suffix[] = {
[TSDB_FTYPE_HEAD] = ".head", //
[TSDB_FTYPE_DATA] = ".data", //
[TSDB_FTYPE_SMA] = ".sma", //
[TSDB_FTYPE_TOMB] = ".tomb", //
[TSDB_FTYPE_MAX] = NULL, //
[TSDB_FTYPE_STT] = ".stt",
static int32_t head_to_json(const STFile *file, cJSON *json);
static int32_t data_to_json(const STFile *file, cJSON *json);
static int32_t sma_to_json(const STFile *file, cJSON *json);
static int32_t tomb_to_json(const STFile *file, cJSON *json);
static int32_t stt_to_json(const STFile *file, cJSON *json);
static const struct {
const char *suffix;
int32_t (*to_json)(const STFile *file, cJSON *json);
} g_tfile_info[] = {
[TSDB_FTYPE_HEAD] = {"head", head_to_json}, //
[TSDB_FTYPE_DATA] = {"data", data_to_json}, //
[TSDB_FTYPE_SMA] = {"sma", sma_to_json}, //
[TSDB_FTYPE_TOMB] = {"tomb", tomb_to_json}, //
[TSDB_FTYPE_STT] = {"stt", stt_to_json},
};
static int32_t tfile_to_json(const STFile *file, cJSON *json) {
if (cJSON_AddNumberToObject(json, "did.level", file->did.level) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (cJSON_AddNumberToObject(json, "did.id", file->did.id) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (cJSON_AddNumberToObject(json, "fid", file->fid) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (cJSON_AddNumberToObject(json, "cid", file->cid) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (cJSON_AddNumberToObject(json, "size", file->size) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
}
static int32_t head_to_json(const STFile *file, cJSON *json) { return tfile_to_json(file, json); }
static int32_t data_to_json(const STFile *file, cJSON *json) { return tfile_to_json(file, json); }
static int32_t sma_to_json(const STFile *file, cJSON *json) { return tfile_to_json(file, json); }
static int32_t tomb_to_json(const STFile *file, cJSON *json) { return tfile_to_json(file, json); }
static int32_t stt_to_json(const STFile *file, cJSON *json) {
int32_t code = tfile_to_json(file, json);
if (code) return code;
if (cJSON_AddNumberToObject(json, "lvl", file->stt.lvl) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (cJSON_AddNumberToObject(json, "nseg", file->stt.nseg) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
}
int32_t tsdbTFileInit(STsdb *pTsdb, STFile *pFile) {
SVnode *pVnode = pTsdb->pVnode;
STfs *pTfs = pVnode->pTfs;
if (pTfs) {
snprintf(pFile->fname, //
TSDB_FILENAME_LEN, //
"%s%s%s%sv%df%dver%" PRId64 "%s", //
tfsGetDiskPath(pTfs, pFile->did), //
TD_DIRSEP, //
pTsdb->path, //
TD_DIRSEP, //
TD_VID(pVnode), //
pFile->fid, //
pFile->cid, //
tsdb_ftype_suffix[pFile->type]);
snprintf(pFile->fname, //
TSDB_FILENAME_LEN, //
"%s%s%s%sv%df%dver%" PRId64 ".%s", //
tfsGetDiskPath(pTfs, pFile->did), //
TD_DIRSEP, //
pTsdb->path, //
TD_DIRSEP, //
TD_VID(pVnode), //
pFile->fid, //
pFile->cid, //
g_tfile_info[pFile->type].suffix);
} else {
snprintf(pFile->fname, //
TSDB_FILENAME_LEN, //
"%s%sv%df%dver%" PRId64 "%s", //
pTsdb->path, //
TD_DIRSEP, //
TD_VID(pVnode), //
pFile->fid, //
pFile->cid, //
tsdb_ftype_suffix[pFile->type]);
snprintf(pFile->fname, //
TSDB_FILENAME_LEN, //
"%s%sv%df%dver%" PRId64 ".%s", //
pTsdb->path, //
TD_DIRSEP, //
TD_VID(pVnode), //
pFile->fid, //
pFile->cid, //
g_tfile_info[pFile->type].suffix);
}
pFile->ref = 1;
return 0;
@ -59,3 +114,10 @@ int32_t tsdbTFileClear(STFile *pFile) {
// TODO
return 0;
}
int32_t tsdbTFileToJson(const STFile *file, cJSON *json) {
cJSON *tjson = cJSON_AddObjectToObject(json, g_tfile_info[file->type].suffix);
if (tjson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
return g_tfile_info[file->type].to_json(file, tjson);
}

View File

@ -290,7 +290,7 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
}
}
static int32_t load_fs_from_file(const char *fname, STsdbFS *pFS) {
static int32_t load_fs(const char *fname, STsdbFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
uint8_t *pData = NULL;
@ -724,7 +724,7 @@ int32_t tsdbFSCommit(STsdb *pTsdb) {
code = tsdbFSCreate(&fs);
TSDB_CHECK_CODE(code, lino, _exit);
code = load_fs_from_file(current, &fs);
code = load_fs(current, &fs);
TSDB_CHECK_CODE(code, lino, _exit);
// apply file change
@ -769,7 +769,7 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
tsdbGetCurrentFName(pTsdb, current, current_t);
if (taosCheckExistFile(current)) {
code = load_fs_from_file(current, &pTsdb->fs);
code = load_fs(current, &pTsdb->fs);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosCheckExistFile(current_t)) {