more code
This commit is contained in:
parent
e731a08f2d
commit
8b83c85b0c
|
@ -51,7 +51,7 @@ static FORCE_INLINE int32_t tarray2_make_room(void *arg, // array
|
||||||
int32_t sz // size of element
|
int32_t sz // size of element
|
||||||
) {
|
) {
|
||||||
TARRAY2(void) *a = arg;
|
TARRAY2(void) *a = arg;
|
||||||
int32_t capacity = a->capacity ? (a->capacity << 1) : TARRAY2_MIN_SIZE;
|
int32_t capacity = (a->capacity > 0) ? (a->capacity << 1) : TARRAY2_MIN_SIZE;
|
||||||
while (capacity < es) {
|
while (capacity < es) {
|
||||||
capacity <<= 1;
|
capacity <<= 1;
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ static FORCE_INLINE int32_t tarray2_make_room(void *arg, // array
|
||||||
|
|
||||||
#define TARRAY2_CLEAR(a, cb) \
|
#define TARRAY2_CLEAR(a, cb) \
|
||||||
do { \
|
do { \
|
||||||
if (cb) { \
|
if ((cb) && (a)->size > 0) { \
|
||||||
TArray2Cb cb_ = (TArray2Cb)(cb); \
|
TArray2Cb cb_ = (TArray2Cb)(cb); \
|
||||||
for (int32_t i = 0; i < (a)->size; ++i) { \
|
for (int32_t i = 0; i < (a)->size; ++i) { \
|
||||||
cb_((a)->data + i); \
|
cb_((a)->data + i); \
|
||||||
|
@ -103,7 +103,7 @@ static FORCE_INLINE int32_t tarray2_make_room(void *arg, // array
|
||||||
if ((a)->size > (idx)) { \
|
if ((a)->size > (idx)) { \
|
||||||
memmove((a)->data + (idx) + 1, (a)->data + (idx), sizeof(typeof((a)->data[0])) * ((a)->size - (idx))); \
|
memmove((a)->data + (idx) + 1, (a)->data + (idx), sizeof(typeof((a)->data[0])) * ((a)->size - (idx))); \
|
||||||
} \
|
} \
|
||||||
(a)->data[(idx)] = e; \
|
(a)->data[(idx)] = (e); \
|
||||||
(a)->size++; \
|
(a)->size++; \
|
||||||
} \
|
} \
|
||||||
__ret; \
|
__ret; \
|
||||||
|
|
|
@ -24,6 +24,7 @@ extern "C" {
|
||||||
|
|
||||||
/* Exposed Handle */
|
/* Exposed Handle */
|
||||||
typedef struct STFileSystem STFileSystem;
|
typedef struct STFileSystem STFileSystem;
|
||||||
|
typedef TARRAY2(STFileSet *) TFileSetArray;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FEDIT_COMMIT = 1, //
|
TSDB_FEDIT_COMMIT = 1, //
|
||||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
||||||
typedef struct STFileSet STFileSet;
|
typedef struct STFileSet STFileSet;
|
||||||
typedef struct STFileOp STFileOp;
|
typedef struct STFileOp STFileOp;
|
||||||
typedef struct SSttLvl SSttLvl;
|
typedef struct SSttLvl SSttLvl;
|
||||||
typedef TARRAY2(STFileSet *) TFileSetArray;
|
typedef TARRAY2(STFileObj *) TFileObjArray;
|
||||||
typedef TARRAY2(SSttLvl *) TSttLvlArray;
|
typedef TARRAY2(SSttLvl *) TSttLvlArray;
|
||||||
typedef TARRAY2(STFileOp) TFileOpArray;
|
typedef TARRAY2(STFileOp) TFileOpArray;
|
||||||
|
|
||||||
|
@ -37,17 +37,20 @@ typedef enum {
|
||||||
TSDB_FOP_TRUNCATE,
|
TSDB_FOP_TRUNCATE,
|
||||||
} tsdb_fop_t;
|
} tsdb_fop_t;
|
||||||
|
|
||||||
|
// init/clear
|
||||||
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
|
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
|
||||||
int32_t tsdbTFileSetInitEx(const STFileSet *fset1, STFileSet **fset2);
|
int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
||||||
int32_t tsdbTFileSetClear(STFileSet **fset);
|
int32_t tsdbTFileSetClear(STFileSet **fset);
|
||||||
|
// to/from json
|
||||||
int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json);
|
int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json);
|
||||||
int32_t tsdbJsonToTFileSet(const cJSON *json, STFileSet **fset);
|
int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
|
||||||
|
// cmpr
|
||||||
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2);
|
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2);
|
||||||
|
// edit
|
||||||
int32_t tsdbTFileSetEdit(STFileSet *fset, const STFileOp *op);
|
int32_t tsdbTFileSetEdit(STFileSet *fset, const STFileOp *op);
|
||||||
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset);
|
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset);
|
||||||
|
// max commit id
|
||||||
|
int64_t tsdbTFileSetMaxCid(const STFileSet *fset);
|
||||||
|
|
||||||
const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level);
|
const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level);
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct STFile STFile;
|
typedef struct STFile STFile;
|
||||||
typedef struct STFileObj STFileObj;
|
typedef struct STFileObj STFileObj;
|
||||||
typedef TARRAY2(STFileObj *) TFileObjArray;
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FTYPE_HEAD = 0, // .head
|
TSDB_FTYPE_HEAD = 0, // .head
|
||||||
|
@ -34,15 +33,21 @@ typedef enum {
|
||||||
TSDB_FTYPE_STT = TSDB_FTYPE_TOMB + 2, // .stt
|
TSDB_FTYPE_STT = TSDB_FTYPE_TOMB + 2, // .stt
|
||||||
} tsdb_ftype_t;
|
} tsdb_ftype_t;
|
||||||
|
|
||||||
|
enum {
|
||||||
|
TSDB_FSTATE_EXIST = 1,
|
||||||
|
TSDB_FSTATE_REMOVED,
|
||||||
|
};
|
||||||
|
|
||||||
#define TSDB_FTYPE_MIN TSDB_FTYPE_HEAD
|
#define TSDB_FTYPE_MIN TSDB_FTYPE_HEAD
|
||||||
#define TSDB_FTYPE_MAX (TSDB_FTYPE_TOMB + 1)
|
#define TSDB_FTYPE_MAX (TSDB_FTYPE_TOMB + 1)
|
||||||
|
|
||||||
// STFile
|
// STFile
|
||||||
int32_t tsdbTFileToJson(const STFile *f, cJSON *json);
|
int32_t tsdbTFileToJson(const STFile *f, cJSON *json);
|
||||||
int32_t tsdbJsonToTFile(const cJSON *json, tsdb_ftype_t ftype, STFile *f);
|
int32_t tsdbJsonToTFile(const cJSON *json, tsdb_ftype_t ftype, STFile *f);
|
||||||
|
int32_t tsdbTFileName(STsdb *pTsdb, const STFile *f, char fname[]);
|
||||||
|
|
||||||
// STFileObj
|
// STFileObj
|
||||||
int32_t tsdbTFileObjInit(const STFile *f, STFileObj **fobj);
|
int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj);
|
||||||
int32_t tsdbTFileObjRef(STFileObj *fobj);
|
int32_t tsdbTFileObjRef(STFileObj *fobj);
|
||||||
int32_t tsdbTFileObjUnref(STFileObj *fobj);
|
int32_t tsdbTFileObjUnref(STFileObj *fobj);
|
||||||
int32_t tsdbTFileRemove(STFileObj *fobj);
|
int32_t tsdbTFileRemove(STFileObj *fobj);
|
||||||
|
@ -61,11 +66,6 @@ struct STFile {
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
|
||||||
TSDB_FSTATE_EXIST = 1,
|
|
||||||
TSDB_FSTATE_REMOVED,
|
|
||||||
};
|
|
||||||
|
|
||||||
struct STFileObj {
|
struct STFileObj {
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
STFile f;
|
STFile f;
|
||||||
|
|
|
@ -27,9 +27,9 @@ typedef struct {
|
||||||
int8_t cmprAlg;
|
int8_t cmprAlg;
|
||||||
int8_t sttTrigger;
|
int8_t sttTrigger;
|
||||||
|
|
||||||
SArray *aTbDataP; // SArray<STbData *>
|
SArray *aTbDataP; // SArray<STbData *>
|
||||||
SArray *aFileOp; // SArray<STFileOp>
|
TFileOpArray opArray;
|
||||||
int64_t eid; // edit id
|
int64_t eid; // edit id
|
||||||
|
|
||||||
// context
|
// context
|
||||||
TSKEY nextKey;
|
TSKEY nextKey;
|
||||||
|
@ -275,14 +275,15 @@ static int32_t commit_fset_end(SCommitter *pCommitter) {
|
||||||
|
|
||||||
if (pCommitter->pWriter == NULL) return 0;
|
if (pCommitter->pWriter == NULL) return 0;
|
||||||
|
|
||||||
struct STFileOp *pFileOp = taosArrayReserve(pCommitter->aFileOp, 1);
|
// TODO
|
||||||
if (pFileOp == NULL) {
|
// struct STFileOp *pFileOp = taosArrayReserve(pCommitter->aFileOp, 1);
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
// if (pFileOp == NULL) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
// code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
// }
|
||||||
|
|
||||||
code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp);
|
// code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, pFileOp);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -337,12 +338,11 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
|
||||||
pCommitter->sttTrigger = 1; // TODO
|
pCommitter->sttTrigger = 1; // TODO
|
||||||
|
|
||||||
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
||||||
pCommitter->aFileOp = taosArrayInit(16, sizeof(STFileOp));
|
if (pCommitter->aTbDataP == NULL) {
|
||||||
if (pCommitter->aTbDataP == NULL || pCommitter->aFileOp == NULL) {
|
|
||||||
taosArrayDestroy(pCommitter->aTbDataP);
|
taosArrayDestroy(pCommitter->aTbDataP);
|
||||||
taosArrayDestroy(pCommitter->aFileOp);
|
|
||||||
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
}
|
}
|
||||||
|
TARRAY2_INIT(&pCommitter->opArray);
|
||||||
tsdbFSAllocEid(pTsdb->pFS, &pCommitter->eid);
|
tsdbFSAllocEid(pTsdb->pFS, &pCommitter->eid);
|
||||||
|
|
||||||
// start loop
|
// start loop
|
||||||
|
@ -363,7 +363,7 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
|
||||||
int32_t vid = TD_VID(pCommiter->pTsdb->pVnode);
|
int32_t vid = TD_VID(pCommiter->pTsdb->pVnode);
|
||||||
|
|
||||||
if (eno == 0) {
|
if (eno == 0) {
|
||||||
code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, pCommiter->aFileOp, TSDB_FEDIT_COMMIT);
|
code = tsdbFSEditBegin(pCommiter->pTsdb->pFS, NULL /* TODO */, TSDB_FEDIT_COMMIT);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
// TODO
|
// TODO
|
||||||
|
@ -372,7 +372,7 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
|
||||||
|
|
||||||
ASSERT(pCommiter->pWriter == NULL);
|
ASSERT(pCommiter->pWriter == NULL);
|
||||||
taosArrayDestroy(pCommiter->aTbDataP);
|
taosArrayDestroy(pCommiter->aTbDataP);
|
||||||
taosArrayDestroy(pCommiter->aFileOp);
|
TARRAY2_CLEAR_FREE(&pCommiter->opArray, NULL);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -186,7 +186,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t load_fs(const char *fname, TFileSetArray *arr) {
|
static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
@ -198,23 +198,23 @@ static int32_t load_fs(const char *fname, TFileSetArray *arr) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// parse json
|
// parse json
|
||||||
const cJSON *item;
|
const cJSON *item1;
|
||||||
|
|
||||||
/* fmtv */
|
/* fmtv */
|
||||||
item = cJSON_GetObjectItem(json, "fmtv");
|
item1 = cJSON_GetObjectItem(json, "fmtv");
|
||||||
if (cJSON_IsNumber(item)) {
|
if (cJSON_IsNumber(item1)) {
|
||||||
ASSERT(item->valuedouble == 1);
|
ASSERT(item1->valuedouble == 1);
|
||||||
} else {
|
} else {
|
||||||
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* fset */
|
/* fset */
|
||||||
item = cJSON_GetObjectItem(json, "fset");
|
item1 = cJSON_GetObjectItem(json, "fset");
|
||||||
if (cJSON_IsArray(item)) {
|
if (cJSON_IsArray(item1)) {
|
||||||
const cJSON *titem;
|
const cJSON *item2;
|
||||||
cJSON_ArrayForEach(titem, item) {
|
cJSON_ArrayForEach(item2, item1) {
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
code = tsdbJsonToTFileSet(titem, &fset);
|
code = tsdbJsonToTFileSet(pTsdb, item2, &fset);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = TARRAY2_APPEND(arr, fset);
|
code = TARRAY2_APPEND(arr, fset);
|
||||||
|
@ -257,7 +257,7 @@ static int32_t apply_commit(STFileSystem *fs) {
|
||||||
n1 = TARRAY2_SIZE(&fs->cstate);
|
n1 = TARRAY2_SIZE(&fs->cstate);
|
||||||
} else if (fset1->fid > fset2->fid) {
|
} else if (fset1->fid > fset2->fid) {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = tsdbTFileSetInitEx(fset2, &fset1);
|
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
|
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
@ -277,7 +277,7 @@ static int32_t apply_commit(STFileSystem *fs) {
|
||||||
n1 = TARRAY2_SIZE(&fs->cstate);
|
n1 = TARRAY2_SIZE(&fs->cstate);
|
||||||
} else {
|
} else {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = tsdbTFileSetInitEx(fset2, &fset1);
|
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
|
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
@ -356,7 +356,13 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t scan_and_fix_fs(STFileSystem *pFS) {
|
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
|
||||||
|
fs->neid = 0;
|
||||||
|
|
||||||
|
// get max commit id
|
||||||
|
const STFileSet *fset;
|
||||||
|
TARRAY2_FOREACH(&fs->cstate, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -366,17 +372,20 @@ static int32_t update_fs_if_needed(STFileSystem *pFS) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbFSDupState(const TFileSetArray *src, TFileSetArray *dst) {
|
static int32_t tsdbFSDupState(STFileSystem *fs) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
const TFileSetArray *src = &fs->cstate;
|
||||||
|
TFileSetArray *dst = &fs->nstate;
|
||||||
|
|
||||||
TARRAY2_CLEAR(dst, tsdbTFileSetClear);
|
TARRAY2_CLEAR(dst, tsdbTFileSetClear);
|
||||||
|
|
||||||
const STFileSet *fset1;
|
const STFileSet *fset1;
|
||||||
TARRAY2_FOREACH(src, fset1) {
|
TARRAY2_FOREACH(src, fset1) {
|
||||||
STFileSet *fset;
|
STFileSet *fset2;
|
||||||
|
code = tsdbTFileSetInitEx(fs->pTsdb, fset1, &fset2);
|
||||||
int32_t code = tsdbTFileSetInitEx(fset1, &fset);
|
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
code = TARRAY2_APPEND(dst, fset2);
|
||||||
code = TARRAY2_APPEND(dst, fset);
|
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -400,7 +409,7 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
|
||||||
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
|
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
|
||||||
|
|
||||||
if (taosCheckExistFile(fCurrent)) { // current.json exists
|
if (taosCheckExistFile(fCurrent)) { // current.json exists
|
||||||
code = load_fs(fCurrent, &fs->cstate);
|
code = load_fs(pTsdb, fCurrent, &fs->cstate);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (taosCheckExistFile(cCurrent)) {
|
if (taosCheckExistFile(cCurrent)) {
|
||||||
|
@ -411,7 +420,7 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
|
||||||
code = abort_edit(fs);
|
code = abort_edit(fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
code = load_fs(cCurrent, &fs->nstate);
|
code = load_fs(pTsdb, cCurrent, &fs->nstate);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = commit_edit(fs);
|
code = commit_edit(fs);
|
||||||
|
@ -424,10 +433,10 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbFSDupState(&fs->cstate, &fs->nstate);
|
code = tsdbFSDupState(fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = scan_and_fix_fs(fs);
|
code = tsdbFSScanAndFix(fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
code = save_fs(&fs->cstate, fCurrent);
|
code = save_fs(&fs->cstate, fCurrent);
|
||||||
|
@ -443,8 +452,10 @@ _exit:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t close_file_system(STFileSystem *pFS) {
|
static int32_t close_file_system(STFileSystem *fs) {
|
||||||
ASSERTS(0, "TODO: Not implemented yet");
|
TARRAY2_CLEAR(&fs->cstate, tsdbTFileSetClear);
|
||||||
|
TARRAY2_CLEAR(&fs->nstate, tsdbTFileSetClear);
|
||||||
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -528,7 +539,6 @@ int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove eid
|
|
||||||
int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) {
|
int32_t tsdbFSEditBegin(STFileSystem *fs, const SArray *aFileOp, EFEditT etype) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
@ -579,8 +589,9 @@ int32_t tsdbFSEditAbort(STFileSystem *fs) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **ppFSet) {
|
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **fset) {
|
||||||
// STFileSet fset = {.fid = fid};
|
STFileSet tfset = {.fid = fid};
|
||||||
// ppFSet[0] = taosArraySearch(fs->cstate, &fset, (__compar_fn_t)tsdbFSetCmprFn, TD_EQ);
|
fset[0] = &tfset;
|
||||||
|
fset[0] = TARRAY2_SEARCH(&fs->cstate, fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -16,9 +16,7 @@
|
||||||
#include "inc/tsdbFSet.h"
|
#include "inc/tsdbFSet.h"
|
||||||
|
|
||||||
static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
||||||
lvl[0] = taosMemoryMalloc(sizeof(SSttLvl));
|
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
if (lvl[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
|
|
||||||
lvl[0]->level = level;
|
lvl[0]->level = level;
|
||||||
TARRAY2_INIT(&lvl[0]->farr);
|
TARRAY2_INIT(&lvl[0]->farr);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -30,14 +28,14 @@ static int32_t tsdbSttLvlClear(SSttLvl **lvl) {
|
||||||
lvl[0] = NULL;
|
lvl[0] = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int32_t tsdbSttLvlInitEx(const SSttLvl *lvl1, SSttLvl **lvl) {
|
static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl) {
|
||||||
int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
|
int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
|
||||||
const STFileObj *fobj1;
|
const STFileObj *fobj1;
|
||||||
TARRAY2_FOREACH(&lvl1->farr, fobj1) {
|
TARRAY2_FOREACH(&lvl1->farr, fobj1) {
|
||||||
STFileObj *fobj;
|
STFileObj *fobj;
|
||||||
code = tsdbTFileObjInit(&fobj1->f, &fobj);
|
code = tsdbTFileObjInit(pTsdb, &fobj1->f, &fobj);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbSttLvlClear(lvl);
|
tsdbSttLvlClear(lvl);
|
||||||
return code;
|
return code;
|
||||||
|
@ -48,6 +46,12 @@ static int32_t tsdbSttLvlInitEx(const SSttLvl *lvl1, SSttLvl **lvl) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbSttLvlCmprFn(const SSttLvl *lvl1, const SSttLvl *lvl2) {
|
||||||
|
if (lvl1->level < lvl2->level) return -1;
|
||||||
|
if (lvl1->level > lvl2->level) return 1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSttLvlToJson(const SSttLvl *lvl, cJSON *json) {
|
static int32_t tsdbSttLvlToJson(const SSttLvl *lvl, cJSON *json) {
|
||||||
if (cJSON_AddNumberToObject(json, "level", lvl->level) == NULL) {
|
if (cJSON_AddNumberToObject(json, "level", lvl->level) == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -68,7 +72,7 @@ static int32_t tsdbSttLvlToJson(const SSttLvl *lvl, cJSON *json) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbJsonToSttLvl(const cJSON *json, SSttLvl **lvl) {
|
static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl) {
|
||||||
const cJSON *item1, *item2;
|
const cJSON *item1, *item2;
|
||||||
int32_t level;
|
int32_t level;
|
||||||
|
|
||||||
|
@ -97,7 +101,7 @@ static int32_t tsdbJsonToSttLvl(const cJSON *json, SSttLvl **lvl) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STFileObj *fobj;
|
STFileObj *fobj;
|
||||||
code = tsdbTFileObjInit(&tf, &fobj);
|
code = tsdbTFileObjInit(pTsdb, &tf, &fobj);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbSttLvlClear(lvl);
|
tsdbSttLvlClear(lvl);
|
||||||
return code;
|
return code;
|
||||||
|
@ -140,7 +144,7 @@ int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbJsonToTFileSet(const cJSON *json, STFileSet **fset) {
|
int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
const cJSON *item1, *item2;
|
const cJSON *item1, *item2;
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
|
@ -164,6 +168,9 @@ int32_t tsdbJsonToTFileSet(const cJSON *json, STFileSet **fset) {
|
||||||
} else if (code) {
|
} else if (code) {
|
||||||
tsdbTFileSetClear(fset);
|
tsdbTFileSetClear(fset);
|
||||||
return code;
|
return code;
|
||||||
|
} else {
|
||||||
|
code = tsdbTFileObjInit(pTsdb, &tf, &(*fset)->farr[ftype]);
|
||||||
|
if (code) return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,7 +179,7 @@ int32_t tsdbJsonToTFileSet(const cJSON *json, STFileSet **fset) {
|
||||||
if (cJSON_IsArray(item1)) {
|
if (cJSON_IsArray(item1)) {
|
||||||
cJSON_ArrayForEach(item2, item1) {
|
cJSON_ArrayForEach(item2, item1) {
|
||||||
SSttLvl *lvl;
|
SSttLvl *lvl;
|
||||||
code = tsdbJsonToSttLvl(item2, &lvl);
|
code = tsdbJsonToSttLvl(pTsdb, item2, &lvl);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbTFileSetClear(fset);
|
tsdbTFileSetClear(fset);
|
||||||
return code;
|
return code;
|
||||||
|
@ -223,14 +230,14 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileSetInitEx(const STFileSet *fset1, STFileSet **fset) {
|
int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
|
||||||
int32_t code = tsdbTFileSetInit(fset1->fid, fset);
|
int32_t code = tsdbTFileSetInit(fset1->fid, fset);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
|
||||||
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||||
if (fset1->farr[ftype] == NULL) continue;
|
if (fset1->farr[ftype] == NULL) continue;
|
||||||
|
|
||||||
code = tsdbTFileObjInit(&fset1->farr[ftype]->f, &fset[0]->farr[ftype]);
|
code = tsdbTFileObjInit(pTsdb, &fset1->farr[ftype]->f, &fset[0]->farr[ftype]);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbTFileSetClear(fset);
|
tsdbTFileSetClear(fset);
|
||||||
return code;
|
return code;
|
||||||
|
@ -240,7 +247,7 @@ int32_t tsdbTFileSetInitEx(const STFileSet *fset1, STFileSet **fset) {
|
||||||
const SSttLvl *lvl1;
|
const SSttLvl *lvl1;
|
||||||
TARRAY2_FOREACH(&fset1->lvlArr, lvl1) {
|
TARRAY2_FOREACH(&fset1->lvlArr, lvl1) {
|
||||||
SSttLvl *lvl;
|
SSttLvl *lvl;
|
||||||
code = tsdbSttLvlInitEx(lvl1, &lvl);
|
code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbTFileSetClear(fset);
|
tsdbTFileSetClear(fset);
|
||||||
return code;
|
return code;
|
||||||
|
@ -269,15 +276,27 @@ int32_t tsdbTFileSetClear(STFileSet **fset) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) {
|
const SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) {
|
||||||
// SSttLvl tlvl = {.level = level};
|
SSttLvl tlvl = {.level = level};
|
||||||
// SRBTreeNode *node = tRBTreeGet(&fset->lvlTree, &tlvl.rbtn);
|
const SSttLvl *lvl = &tlvl;
|
||||||
// return node ? TCONTAINER_OF(node, SSttLvl, rbtn) : NULL;
|
return TARRAY2_SEARCH(&fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ);
|
||||||
// TODO
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2) {
|
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2) {
|
||||||
if (fset1[0]->fid < fset2[0]->fid) return -1;
|
if (fset1[0]->fid < fset2[0]->fid) return -1;
|
||||||
if (fset1[0]->fid > fset2[0]->fid) return 1;
|
if (fset1[0]->fid > fset2[0]->fid) return 1;
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t tsdbTFileSetMaxCid(const STFileSet *fset) {
|
||||||
|
int64_t maxCid = 0;
|
||||||
|
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||||
|
if (fset->farr[ftype] == NULL) continue;
|
||||||
|
maxCid = TMAX(maxCid, fset->farr[ftype]->f.cid);
|
||||||
|
}
|
||||||
|
const SSttLvl *lvl;
|
||||||
|
const STFileObj *fobj;
|
||||||
|
TARRAY2_FOREACH(&fset->lvlArr, lvl) {
|
||||||
|
TARRAY2_FOREACH(&lvl->farr, fobj) { maxCid = TMAX(maxCid, fobj->f.cid); }
|
||||||
|
}
|
||||||
|
return maxCid;
|
||||||
}
|
}
|
|
@ -166,36 +166,6 @@ static int32_t stt_from_json(const cJSON *json, STFile *file) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t tsdbTFileInit(STsdb *pTsdb, STFile *pFile) {
|
|
||||||
// SVnode *pVnode = pTsdb->pVnode;
|
|
||||||
// STfs *pTfs = pVnode->pTfs;
|
|
||||||
|
|
||||||
// if (pTfs) {
|
|
||||||
// // snprintf(pFile->fname, //
|
|
||||||
// // TSDB_FILENAME_LEN, //
|
|
||||||
// // "%s%s%s%sv%df%dver%" PRId64 ".%s", //
|
|
||||||
// // tfsGetDiskPath(pTfs, pFile->did), //
|
|
||||||
// // TD_DIRSEP, //
|
|
||||||
// // pTsdb->path, //
|
|
||||||
// // TD_DIRSEP, //
|
|
||||||
// // TD_VID(pVnode), //
|
|
||||||
// // pFile->fid, //
|
|
||||||
// // pFile->cid, //
|
|
||||||
// // g_tfile_info[pFile->type].suffix);
|
|
||||||
// } else {
|
|
||||||
// // snprintf(pFile->fname, //
|
|
||||||
// // TSDB_FILENAME_LEN, //
|
|
||||||
// // "%s%sv%df%dver%" PRId64 ".%s", //
|
|
||||||
// // pTsdb->path, //
|
|
||||||
// // TD_DIRSEP, //
|
|
||||||
// // TD_VID(pVnode), //
|
|
||||||
// // pFile->fid, //
|
|
||||||
// // pFile->cid, //
|
|
||||||
// // g_tfile_info[pFile->type].suffix);
|
|
||||||
// }
|
|
||||||
// return 0;
|
|
||||||
// }
|
|
||||||
|
|
||||||
int32_t tsdbTFileToJson(const STFile *file, cJSON *json) {
|
int32_t tsdbTFileToJson(const STFile *file, cJSON *json) {
|
||||||
if (file->type == TSDB_FTYPE_STT) {
|
if (file->type == TSDB_FTYPE_STT) {
|
||||||
return g_tfile_info[file->type].to_json(file, json);
|
return g_tfile_info[file->type].to_json(file, json);
|
||||||
|
@ -225,7 +195,7 @@ int32_t tsdbJsonToTFile(const cJSON *json, tsdb_ftype_t ftype, STFile *f) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileObjInit(const STFile *f, STFileObj **fobj) {
|
int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
|
||||||
fobj[0] = taosMemoryMalloc(sizeof(*fobj[0]));
|
fobj[0] = taosMemoryMalloc(sizeof(*fobj[0]));
|
||||||
if (!fobj[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!fobj[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
@ -233,7 +203,7 @@ int32_t tsdbTFileObjInit(const STFile *f, STFileObj **fobj) {
|
||||||
fobj[0]->f = *f;
|
fobj[0]->f = *f;
|
||||||
fobj[0]->state = TSDB_FSTATE_EXIST;
|
fobj[0]->state = TSDB_FSTATE_EXIST;
|
||||||
fobj[0]->ref = 1;
|
fobj[0]->ref = 1;
|
||||||
// TODO: generate the file name
|
tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,12 +218,11 @@ int32_t tsdbTFileObjRef(STFileObj *fobj) {
|
||||||
|
|
||||||
int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
||||||
int32_t nRef;
|
int32_t nRef;
|
||||||
|
|
||||||
taosThreadMutexLock(&fobj->mutex);
|
taosThreadMutexLock(&fobj->mutex);
|
||||||
nRef = --fobj->ref;
|
nRef = --fobj->ref;
|
||||||
taosThreadMutexUnlock(&fobj->mutex);
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
|
|
||||||
ASSERT(nRef >= 0);
|
ASSERT(nRef >= 0);
|
||||||
|
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
if (fobj->state == TSDB_FSTATE_REMOVED) {
|
if (fobj->state == TSDB_FSTATE_REMOVED) {
|
||||||
// TODO: add the file name
|
// TODO: add the file name
|
||||||
|
@ -261,6 +230,7 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
||||||
}
|
}
|
||||||
taosMemoryFree(fobj);
|
taosMemoryFree(fobj);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,3 +245,33 @@ int32_t tsdbTFileRemove(STFileObj *fobj) {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbTFileName(STsdb *pTsdb, const STFile *f, char fname[]) {
|
||||||
|
SVnode *pVnode = pTsdb->pVnode;
|
||||||
|
STfs *pTfs = pVnode->pTfs;
|
||||||
|
|
||||||
|
if (pTfs) {
|
||||||
|
snprintf(fname, //
|
||||||
|
TSDB_FILENAME_LEN, //
|
||||||
|
"%s%s%s%sv%df%dver%" PRId64 ".%s", //
|
||||||
|
tfsGetDiskPath(pTfs, f->did), //
|
||||||
|
TD_DIRSEP, //
|
||||||
|
pTsdb->path, //
|
||||||
|
TD_DIRSEP, //
|
||||||
|
TD_VID(pVnode), //
|
||||||
|
f->fid, //
|
||||||
|
f->cid, //
|
||||||
|
g_tfile_info[f->type].suffix);
|
||||||
|
} else {
|
||||||
|
snprintf(fname, //
|
||||||
|
TSDB_FILENAME_LEN, //
|
||||||
|
"%s%sv%df%dver%" PRId64 ".%s", //
|
||||||
|
pTsdb->path, //
|
||||||
|
TD_DIRSEP, //
|
||||||
|
TD_VID(pVnode), //
|
||||||
|
f->fid, //
|
||||||
|
f->cid, //
|
||||||
|
g_tfile_info[f->type].suffix);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue