more code
This commit is contained in:
parent
10e6e52670
commit
cf0228f87e
|
@ -36,13 +36,13 @@ typedef enum {
|
||||||
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback);
|
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback);
|
||||||
int32_t tsdbCloseFS(STFileSystem **fs);
|
int32_t tsdbCloseFS(STFileSystem **fs);
|
||||||
// snapshot
|
// snapshot
|
||||||
int32_t tsdbFSCopySnapshot(STFileSystem *fs, TFileSetArray *fsetArr);
|
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr);
|
||||||
int32_t tsdbFSClearSnapshot(TFileSetArray *fsetArr);
|
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr);
|
||||||
// txn
|
// txn
|
||||||
int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid);
|
int32_t tsdbFSAllocEid(STFileSystem *fs, int64_t *eid);
|
||||||
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype);
|
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype);
|
||||||
int32_t tsdbFSEditCommit(STFileSystem *pFS);
|
int32_t tsdbFSEditCommit(STFileSystem *fs);
|
||||||
int32_t tsdbFSEditAbort(STFileSystem *pFS);
|
int32_t tsdbFSEditAbort(STFileSystem *fs);
|
||||||
// other
|
// other
|
||||||
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
|
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
|
||||||
|
|
||||||
|
@ -53,8 +53,8 @@ struct STFileSystem {
|
||||||
int32_t state;
|
int32_t state;
|
||||||
int64_t neid;
|
int64_t neid;
|
||||||
EFEditT etype;
|
EFEditT etype;
|
||||||
TFileSetArray cstate;
|
TFileSetArray fSetArr[1];
|
||||||
TFileSetArray nstate;
|
TFileSetArray fSetArrTmp[1];
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -45,16 +45,16 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
|
||||||
tsem_init(&fs[0]->canEdit, 0, 1);
|
tsem_init(&fs[0]->canEdit, 0, 1);
|
||||||
fs[0]->state = TSDB_FS_STATE_NONE;
|
fs[0]->state = TSDB_FS_STATE_NONE;
|
||||||
fs[0]->neid = 0;
|
fs[0]->neid = 0;
|
||||||
TARRAY2_INIT(&fs[0]->cstate);
|
TARRAY2_INIT(fs[0]->fSetArr);
|
||||||
TARRAY2_INIT(&fs[0]->nstate);
|
TARRAY2_INIT(fs[0]->fSetArrTmp);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t destroy_fs(STFileSystem **fs) {
|
static int32_t destroy_fs(STFileSystem **fs) {
|
||||||
if (fs[0] == NULL) return 0;
|
if (fs[0] == NULL) return 0;
|
||||||
TARRAY2_FREE(&fs[0]->cstate);
|
TARRAY2_FREE(fs[0]->fSetArr);
|
||||||
TARRAY2_FREE(&fs[0]->nstate);
|
TARRAY2_FREE(fs[0]->fSetArrTmp);
|
||||||
tsem_destroy(&fs[0]->canEdit);
|
tsem_destroy(&fs[0]->canEdit);
|
||||||
taosMemoryFree(fs[0]);
|
taosMemoryFree(fs[0]);
|
||||||
fs[0] = NULL;
|
fs[0] = NULL;
|
||||||
|
@ -242,8 +242,8 @@ static bool is_same_file(const STFile *f1, const STFile f2) {
|
||||||
|
|
||||||
static int32_t apply_commit(STFileSystem *fs) {
|
static int32_t apply_commit(STFileSystem *fs) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
TFileSetArray *fsetArray1 = &fs->cstate;
|
TFileSetArray *fsetArray1 = fs->fSetArr;
|
||||||
TFileSetArray *fsetArray2 = &fs->nstate;
|
TFileSetArray *fsetArray2 = fs->fSetArrTmp;
|
||||||
int32_t i1 = 0, i2 = 0;
|
int32_t i1 = 0, i2 = 0;
|
||||||
|
|
||||||
while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
|
while (i1 < TARRAY2_SIZE(fsetArray1) || i2 < TARRAY2_SIZE(fsetArray2)) {
|
||||||
|
@ -357,7 +357,7 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
|
||||||
|
|
||||||
// get max commit id
|
// get max commit id
|
||||||
const STFileSet *fset;
|
const STFileSet *fset;
|
||||||
TARRAY2_FOREACH(&fs->cstate, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
|
TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -371,8 +371,8 @@ static int32_t update_fs_if_needed(STFileSystem *pFS) {
|
||||||
static int32_t tsdbFSDupState(STFileSystem *fs) {
|
static int32_t tsdbFSDupState(STFileSystem *fs) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
const TFileSetArray *src = &fs->cstate;
|
const TFileSetArray *src = fs->fSetArr;
|
||||||
TFileSetArray *dst = &fs->nstate;
|
TFileSetArray *dst = fs->fSetArrTmp;
|
||||||
|
|
||||||
TARRAY2_CLEAR(dst, tsdbTFileSetClear);
|
TARRAY2_CLEAR(dst, tsdbTFileSetClear);
|
||||||
|
|
||||||
|
@ -405,7 +405,7 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
|
||||||
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
|
current_fname(pTsdb, mCurrent, TSDB_FCURRENT_M);
|
||||||
|
|
||||||
if (taosCheckExistFile(fCurrent)) { // current.json exists
|
if (taosCheckExistFile(fCurrent)) { // current.json exists
|
||||||
code = load_fs(pTsdb, fCurrent, &fs->cstate);
|
code = load_fs(pTsdb, fCurrent, fs->fSetArr);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (taosCheckExistFile(cCurrent)) {
|
if (taosCheckExistFile(cCurrent)) {
|
||||||
|
@ -416,7 +416,7 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
|
||||||
code = abort_edit(fs);
|
code = abort_edit(fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
code = load_fs(pTsdb, cCurrent, &fs->nstate);
|
code = load_fs(pTsdb, cCurrent, fs->fSetArrTmp);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = commit_edit(fs);
|
code = commit_edit(fs);
|
||||||
|
@ -435,7 +435,7 @@ static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
|
||||||
code = tsdbFSScanAndFix(fs);
|
code = tsdbFSScanAndFix(fs);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
code = save_fs(&fs->cstate, fCurrent);
|
code = save_fs(fs->fSetArr, fCurrent);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,8 +449,8 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t close_file_system(STFileSystem *fs) {
|
static int32_t close_file_system(STFileSystem *fs) {
|
||||||
TARRAY2_CLEAR(&fs->cstate, tsdbTFileSetClear);
|
TARRAY2_CLEAR(fs->fSetArr, tsdbTFileSetClear);
|
||||||
TARRAY2_CLEAR(&fs->nstate, tsdbTFileSetClear);
|
TARRAY2_CLEAR(fs->fSetArrTmp, tsdbTFileSetClear);
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -473,7 +473,7 @@ static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSe
|
||||||
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
|
static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
TFileSetArray *fsetArray = &fs->nstate;
|
TFileSetArray *fsetArray = fs->fSetArrTmp;
|
||||||
|
|
||||||
STFileSet *fset = NULL;
|
STFileSet *fset = NULL;
|
||||||
const STFileOp *op;
|
const STFileOp *op;
|
||||||
|
@ -568,7 +568,7 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// save fs
|
// save fs
|
||||||
code = save_fs(&fs->nstate, current_t);
|
code = save_fs(fs->fSetArrTmp, current_t);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -596,34 +596,42 @@ int32_t tsdbFSEditAbort(STFileSystem *fs) {
|
||||||
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
|
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
|
||||||
STFileSet tfset = {.fid = fid};
|
STFileSet tfset = {.fid = fid};
|
||||||
STFileSet *pset = &tfset;
|
STFileSet *pset = &tfset;
|
||||||
fset[0] = TARRAY2_SEARCH_EX(&fs->cstate, &pset, tsdbTFileSetCmprFn, TD_EQ);
|
fset[0] = TARRAY2_SEARCH_EX(fs->fSetArr, &pset, tsdbTFileSetCmprFn, TD_EQ);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSCopySnapshot(STFileSystem *fs, TFileSetArray *fsetArr) {
|
int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
STFileSet *fset1;
|
STFileSet *fset1;
|
||||||
|
|
||||||
ASSERT(TARRAY2_SIZE(fsetArr) == 0);
|
fsetArr[0] = taosMemoryMalloc(sizeof(TFileSetArray));
|
||||||
|
if (fsetArr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
TARRAY2_INIT(fsetArr[0]);
|
||||||
|
|
||||||
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||||
TARRAY2_FOREACH(&fs->cstate, fset) {
|
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||||
code = tsdbTFileSetInitEx(fs->tsdb, fset, &fset1);
|
code = tsdbTFileSetInitEx(fs->tsdb, fset, &fset1);
|
||||||
if (code) break;
|
if (code) break;
|
||||||
|
|
||||||
code = TARRAY2_APPEND(fsetArr, fset1);
|
code = TARRAY2_APPEND(fsetArr[0], fset1);
|
||||||
if (code) break;
|
if (code) break;
|
||||||
}
|
}
|
||||||
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
TARRAY2_CLEAR(fsetArr, tsdbTFileSetClear);
|
TARRAY2_CLEAR_FREE(fsetArr[0], tsdbTFileSetClear);
|
||||||
|
taosMemoryFree(fsetArr[0]);
|
||||||
|
fsetArr[0] = NULL;
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFSClearSnapshot(TFileSetArray *fsetArr) {
|
int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
|
||||||
TARRAY2_CLEAR(fsetArr, tsdbTFileSetClear);
|
if (fsetArr[0]) {
|
||||||
|
TARRAY2_CLEAR_FREE(fsetArr[0], tsdbTFileSetClear);
|
||||||
|
fsetArr[0] = NULL;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -16,16 +16,16 @@
|
||||||
#include "inc/tsdbMerge.h"
|
#include "inc/tsdbMerge.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb *tsdb;
|
STsdb *tsdb;
|
||||||
TFileSetArray fsetArr[1];
|
TFileSetArray *fsetArr;
|
||||||
int32_t sttTrigger;
|
int32_t sttTrigger;
|
||||||
int32_t maxRow;
|
int32_t maxRow;
|
||||||
int32_t minRow;
|
int32_t minRow;
|
||||||
int32_t szPage;
|
int32_t szPage;
|
||||||
int8_t cmprAlg;
|
int8_t cmprAlg;
|
||||||
int64_t compactVersion;
|
int64_t compactVersion;
|
||||||
int64_t cid;
|
int64_t cid;
|
||||||
SSkmInfo skmTb[1];
|
SSkmInfo skmTb[1];
|
||||||
|
|
||||||
// context
|
// context
|
||||||
struct {
|
struct {
|
||||||
|
@ -549,18 +549,16 @@ int32_t tsdbMerge(STsdb *tsdb) {
|
||||||
|
|
||||||
SMerger merger[1] = {{
|
SMerger merger[1] = {{
|
||||||
.tsdb = tsdb,
|
.tsdb = tsdb,
|
||||||
.fsetArr = {TARRAY2_INITIALIZER},
|
|
||||||
.sttTrigger = tsdb->pVnode->config.sttTrigger,
|
.sttTrigger = tsdb->pVnode->config.sttTrigger,
|
||||||
}};
|
}};
|
||||||
|
|
||||||
code = tsdbFSCopySnapshot(tsdb->pFS, merger->fsetArr);
|
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = tsdbDoMerge(merger);
|
code = tsdbDoMerge(merger);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
tsdbFSClearSnapshot(merger->fsetArr);
|
tsdbFSDestroyCopySnapshot(&merger->fsetArr);
|
||||||
TARRAY2_FREE(merger->fsetArr);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
Loading…
Reference in New Issue