more code

This commit is contained in:
Hongze Cheng 2023-05-31 10:59:47 +08:00
parent aef8450b61
commit 616a675682
8 changed files with 178 additions and 118 deletions

View File

@ -75,6 +75,9 @@ static FORCE_INLINE int32_t tarray2_make_room( //
#define TARRAY2_INIT(a) TARRAY2_INIT_EX(a, 0, 0, NULL)
#define TARRAY2_INITIALIZER \
{ 0, 0, NULL }
#define TARRAY2_FREE(a) \
do { \
if ((a)->data) { \

View File

@ -35,6 +35,9 @@ typedef enum {
// open/close
int32_t tsdbOpenFS(STsdb *pTsdb, STFileSystem **fs, int8_t rollback);
int32_t tsdbCloseFS(STFileSystem **fs);
// snapshot
int32_t tsdbFSCopySnapshot(STFileSystem *fs, TFileSetArray *fsetArr);
int32_t tsdbFSClearSnapshot(TFileSetArray *fsetArr);
// txn
int32_t tsdbFSAllocEid(STFileSystem *pFS, int64_t *eid);
int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype);
@ -45,7 +48,7 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
/* Exposed Structs */
struct STFileSystem {
STsdb *pTsdb;
STsdb *tsdb;
tsem_t canEdit;
int32_t state;
int64_t neid;

View File

@ -65,13 +65,13 @@ struct STFileOp {
struct SSttLvl {
int32_t level;
TFileObjArray farr;
TFileObjArray fobjArr[1];
};
struct STFileSet {
int32_t fid;
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr; // level array
TSttLvlArray lvlArr[1]; // level array
};
#ifdef __cplusplus

View File

@ -31,6 +31,7 @@ typedef TARRAY2(STbStatisBlk) TStatisBlkArray;
typedef struct SSttFileReader SSttFileReader;
typedef struct SSttFileReaderConfig SSttFileReaderConfig;
typedef struct SSttSegReader SSttSegReader;
typedef TARRAY2(SSttFileReader *) TSttFileReaderArray;
typedef TARRAY2(SSttSegReader *) TSttSegReaderArray;
// SSttFileReader

View File

@ -128,8 +128,8 @@ static int32_t tsdbCommitOpenWriter(SCommitter2 *committer) {
return tsdbCommitOpenNewSttWriter(committer);
}
ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0);
STFileObj *fobj = TARRAY2_LAST(&lvl0->farr);
ASSERT(TARRAY2_SIZE(lvl0->fobjArr) > 0);
STFileObj *fobj = TARRAY2_LAST(lvl0->fobjArr);
if (fobj->f->stt->nseg >= committer->sttTrigger) {
return tsdbCommitOpenNewSttWriter(committer);
} else {

View File

@ -41,7 +41,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
if (fs[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
fs[0]->pTsdb = pTsdb;
fs[0]->tsdb = pTsdb;
tsem_init(&fs[0]->canEdit, 0, 1);
fs[0]->state = TSDB_FS_STATE_NONE;
fs[0]->neid = 0;
@ -256,7 +256,7 @@ static int32_t apply_commit(STFileSystem *fs) {
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
} else if (fset1->fid > fset2->fid) {
// create new file set with fid of fset2->fid
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
code = tsdbTFileSetInitEx(fs->tsdb, fset2, &fset1);
if (code) return code;
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code;
@ -264,7 +264,7 @@ static int32_t apply_commit(STFileSystem *fs) {
i2++;
} else {
// edit
code = tsdbTFileSetApplyEdit(fs->pTsdb, fset2, fset1);
code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
if (code) return code;
i1++;
i2++;
@ -274,7 +274,7 @@ static int32_t apply_commit(STFileSystem *fs) {
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
} else {
// create new file set with fid of fset2->fid
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
code = tsdbTFileSetInitEx(fs->tsdb, fset2, &fset1);
if (code) return code;
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code;
@ -290,11 +290,11 @@ static int32_t commit_edit(STFileSystem *fs) {
char current[TSDB_FILENAME_LEN];
char current_t[TSDB_FILENAME_LEN];
current_fname(fs->pTsdb, current, TSDB_FCURRENT);
current_fname(fs->tsdb, current, TSDB_FCURRENT);
if (fs->etype == TSDB_FEDIT_COMMIT) {
current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_C);
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
} else if (fs->etype == TSDB_FEDIT_MERGE) {
current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_M);
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
} else {
ASSERT(0);
}
@ -310,9 +310,9 @@ static int32_t commit_edit(STFileSystem *fs) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, fs->etype);
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
}
return code;
}
@ -327,9 +327,9 @@ static int32_t abort_edit(STFileSystem *fs) {
char fname[TSDB_FILENAME_LEN];
if (fs->etype == TSDB_FEDIT_COMMIT) {
current_fname(fs->pTsdb, fname, TSDB_FCURRENT_C);
current_fname(fs->tsdb, fname, TSDB_FCURRENT_C);
} else if (fs->etype == TSDB_FEDIT_MERGE) {
current_fname(fs->pTsdb, fname, TSDB_FCURRENT_M);
current_fname(fs->tsdb, fname, TSDB_FCURRENT_M);
} else {
ASSERT(0);
}
@ -345,9 +345,9 @@ static int32_t abort_edit(STFileSystem *fs) {
_exit:
if (code) {
tsdbError("vgId:%d %s failed since %s", TD_VID(fs->pTsdb->pVnode), __func__, tstrerror(code));
tsdbError("vgId:%d %s failed since %s", TD_VID(fs->tsdb->pVnode), __func__, tstrerror(code));
} else {
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, fs->etype);
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
}
return code;
}
@ -379,7 +379,7 @@ static int32_t tsdbFSDupState(STFileSystem *fs) {
const STFileSet *fset1;
TARRAY2_FOREACH(src, fset1) {
STFileSet *fset2;
code = tsdbTFileSetInitEx(fs->pTsdb, fset1, &fset2);
code = tsdbTFileSetInitEx(fs->tsdb, fset1, &fset2);
if (code) return code;
code = TARRAY2_APPEND(dst, fset2);
if (code) return code;
@ -391,7 +391,7 @@ static int32_t tsdbFSDupState(STFileSystem *fs) {
static int32_t open_fs(STFileSystem *fs, int8_t rollback) {
int32_t code = 0;
int32_t lino = 0;
STsdb *pTsdb = fs->pTsdb;
STsdb *pTsdb = fs->tsdb;
code = update_fs_if_needed(fs);
TSDB_CHECK_CODE(code, lino, _exit);
@ -492,7 +492,7 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
}
}
code = tsdbTFileSetEdit(fs->pTsdb, fset, op);
code = tsdbTFileSetEdit(fs->tsdb, fset, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -550,10 +550,10 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
switch (etype) {
case TSDB_FEDIT_COMMIT:
current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_C);
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_C);
break;
case TSDB_FEDIT_MERGE:
current_fname(fs->pTsdb, current_t, TSDB_FCURRENT_M);
current_fname(fs->tsdb, current_t, TSDB_FCURRENT_M);
break;
default:
ASSERT(0);
@ -573,10 +573,10 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, lino,
tsdbError("vgId:%d %s failed at line %d since %s, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, lino,
tstrerror(code), etype);
} else {
tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->pTsdb->pVnode), __func__, etype);
tsdbInfo("vgId:%d %s done, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, etype);
}
return code;
}
@ -598,4 +598,32 @@ int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset) {
STFileSet *pset = &tfset;
fset[0] = TARRAY2_SEARCH_EX(&fs->cstate, &pset, tsdbTFileSetCmprFn, TD_EQ);
return 0;
}
int32_t tsdbFSCopySnapshot(STFileSystem *fs, TFileSetArray *fsetArr) {
int32_t code = 0;
STFileSet *fset;
STFileSet *fset1;
ASSERT(TARRAY2_SIZE(fsetArr) == 0);
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
TARRAY2_FOREACH(&fs->cstate, fset) {
code = tsdbTFileSetInitEx(fs->tsdb, fset, &fset1);
if (code) break;
code = TARRAY2_APPEND(fsetArr, fset1);
if (code) break;
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
if (code) {
TARRAY2_CLEAR(fsetArr, tsdbTFileSetClear);
}
return code;
}
int32_t tsdbFSClearSnapshot(TFileSetArray *fsetArr) {
TARRAY2_CLEAR(fsetArr, tsdbTFileSetClear);
return 0;
}

View File

@ -18,13 +18,13 @@
static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
if (!(lvl[0] = taosMemoryMalloc(sizeof(SSttLvl)))) return TSDB_CODE_OUT_OF_MEMORY;
lvl[0]->level = level;
TARRAY2_INIT(&lvl[0]->farr);
TARRAY2_INIT(lvl[0]->fobjArr);
return 0;
}
static void tsdbSttLvlClearFObj(void *data) { tsdbTFileObjUnref(*(STFileObj **)data); }
static int32_t tsdbSttLvlClear(SSttLvl **lvl) {
TARRAY2_CLEAR_FREE(&lvl[0]->farr, tsdbSttLvlClearFObj);
TARRAY2_CLEAR_FREE(lvl[0]->fobjArr, tsdbSttLvlClearFObj);
taosMemoryFree(lvl[0]);
lvl[0] = NULL;
return 0;
@ -35,7 +35,7 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl
if (code) return code;
const STFileObj *fobj1;
TARRAY2_FOREACH(&lvl1->farr, fobj1) {
TARRAY2_FOREACH(lvl1->fobjArr, fobj1) {
STFileObj *fobj;
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj);
if (code) {
@ -43,14 +43,14 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl
return code;
}
TARRAY2_APPEND(&lvl[0]->farr, fobj);
TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
}
return 0;
}
static void tsdbSttLvlRemoveFObj(void *data) { tsdbTFileObjRemove(*(STFileObj **)data); }
static void tsdbSttLvlRemove(SSttLvl **lvl) {
TARRAY2_CLEAR_FREE(&lvl[0]->farr, tsdbSttLvlRemoveFObj);
TARRAY2_CLEAR_FREE(lvl[0]->fobjArr, tsdbSttLvlRemoveFObj);
taosMemoryFree(lvl[0]);
lvl[0] = NULL;
}
@ -61,32 +61,32 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l
ASSERT(lvl1->level == lvl2->level);
int32_t i1 = 0, i2 = 0;
while (i1 < TARRAY2_SIZE(&lvl1->farr) || i2 < TARRAY2_SIZE(&lvl2->farr)) {
STFileObj *fobj1 = i1 < TARRAY2_SIZE(&lvl1->farr) ? TARRAY2_GET(&lvl1->farr, i1) : NULL;
STFileObj *fobj2 = i2 < TARRAY2_SIZE(&lvl2->farr) ? TARRAY2_GET(&lvl2->farr, i2) : NULL;
while (i1 < TARRAY2_SIZE(lvl1->fobjArr) || i2 < TARRAY2_SIZE(lvl2->fobjArr)) {
STFileObj *fobj1 = i1 < TARRAY2_SIZE(lvl1->fobjArr) ? TARRAY2_GET(lvl1->fobjArr, i1) : NULL;
STFileObj *fobj2 = i2 < TARRAY2_SIZE(lvl2->fobjArr) ? TARRAY2_GET(lvl2->fobjArr, i2) : NULL;
if (fobj1 && fobj2) {
if (fobj1->f->cid < fobj2->f->cid) {
// create a file obj
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_APPEND(&lvl2->farr, fobj2);
code = TARRAY2_APPEND(lvl2->fobjArr, fobj2);
if (code) return code;
i1++;
i2++;
} else if (fobj1->f->cid > fobj2->f->cid) {
// remove a file obj
TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj);
TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj);
} else {
if (tsdbIsSameTFile(fobj1->f, fobj2->f)) {
if (tsdbIsTFileChanged(fobj1->f, fobj2->f)) {
fobj2->f[0] = fobj1->f[0];
}
} else {
TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj);
TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj);
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_SORT_INSERT(&lvl2->farr, fobj2, tsdbTFileObjCmpr);
code = TARRAY2_SORT_INSERT(lvl2->fobjArr, fobj2, tsdbTFileObjCmpr);
if (code) return code;
}
i1++;
@ -96,13 +96,13 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l
// create a file obj
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_APPEND(&lvl2->farr, fobj2);
code = TARRAY2_APPEND(lvl2->fobjArr, fobj2);
if (code) return code;
i1++;
i2++;
} else {
// remove a file obj
TARRAY2_REMOVE(&lvl2->farr, i2, tsdbSttLvlRemoveFObj);
TARRAY2_REMOVE(lvl2->fobjArr, i2, tsdbSttLvlRemoveFObj);
}
}
return 0;
@ -122,7 +122,7 @@ static int32_t tsdbSttLvlToJson(const SSttLvl *lvl, cJSON *json) {
cJSON *ajson = cJSON_AddArrayToObject(json, "files");
if (ajson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
const STFileObj *fobj;
TARRAY2_FOREACH(&lvl->farr, fobj) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
cJSON *item = cJSON_CreateObject();
if (item == NULL) return TSDB_CODE_OUT_OF_MEMORY;
cJSON_AddItemToArray(ajson, item);
@ -169,7 +169,7 @@ static int32_t tsdbJsonToSttLvl(STsdb *pTsdb, const cJSON *json, SSttLvl **lvl)
return code;
}
TARRAY2_APPEND(&lvl[0]->farr, fobj);
TARRAY2_APPEND(lvl[0]->fobjArr, fobj);
}
return 0;
}
@ -194,7 +194,7 @@ int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json) {
item1 = cJSON_AddArrayToObject(json, "stt lvl");
if (item1 == NULL) return TSDB_CODE_OUT_OF_MEMORY;
const SSttLvl *lvl;
TARRAY2_FOREACH(&fset->lvlArr, lvl) {
TARRAY2_FOREACH(fset->lvlArr, lvl) {
item2 = cJSON_CreateObject();
if (!item2) return TSDB_CODE_OUT_OF_MEMORY;
cJSON_AddItemToArray(item1, item2);
@ -247,7 +247,7 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset) {
return code;
}
TARRAY2_APPEND(&(*fset)->lvlArr, lvl);
TARRAY2_APPEND((*fset)->lvlArr, lvl);
}
} else {
return TSDB_CODE_FILE_CORRUPTED;
@ -272,11 +272,11 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
code = tsdbSttLvlInit(fobj->f->stt->level, &lvl);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset->lvlArr, lvl, tsdbSttLvlCmprFn);
code = TARRAY2_SORT_INSERT(fset->lvlArr, lvl, tsdbSttLvlCmprFn);
if (code) return code;
}
code = TARRAY2_SORT_INSERT(&lvl->farr, fobj, tsdbTFileObjCmpr);
code = TARRAY2_SORT_INSERT(lvl->fobjArr, fobj, tsdbTFileObjCmpr);
if (code) return code;
} else {
ASSERT(fset->farr[fobj->f->type] == NULL);
@ -290,11 +290,11 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
STFileObj tfobj = {.f[0] = {.cid = op->of.cid}};
STFileObj *tfobjp = &tfobj;
int32_t idx = TARRAY2_SEARCH_IDX(&lvl->farr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
ASSERT(idx >= 0);
TARRAY2_REMOVE(&lvl->farr, idx, tsdbSttLvlRemoveFObj);
TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlRemoveFObj);
if (TARRAY2_SIZE(&lvl->farr) == 0) {
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
// TODO: remove the stt level if no file exists anymore
// TARRAY2_REMOVE(&fset->lvlArr, lvl - fset->lvlArr.data, tsdbSttLvlClear);
}
@ -309,7 +309,7 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
ASSERT(lvl);
STFileObj tfobj = {.f[0] = {.cid = op->of.cid}}, *tfobjp = &tfobj;
tfobjp = TARRAY2_SEARCH_EX(&lvl->farr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
tfobjp = TARRAY2_SEARCH_EX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
ASSERT(tfobjp);
@ -356,22 +356,22 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f
// stt part
int32_t i1 = 0, i2 = 0;
while (i1 < TARRAY2_SIZE(&fset1->lvlArr) || i2 < TARRAY2_SIZE(&fset2->lvlArr)) {
SSttLvl *lvl1 = i1 < TARRAY2_SIZE(&fset1->lvlArr) ? TARRAY2_GET(&fset1->lvlArr, i1) : NULL;
SSttLvl *lvl2 = i2 < TARRAY2_SIZE(&fset2->lvlArr) ? TARRAY2_GET(&fset2->lvlArr, i2) : NULL;
while (i1 < TARRAY2_SIZE(fset1->lvlArr) || i2 < TARRAY2_SIZE(fset2->lvlArr)) {
SSttLvl *lvl1 = i1 < TARRAY2_SIZE(fset1->lvlArr) ? TARRAY2_GET(fset1->lvlArr, i1) : NULL;
SSttLvl *lvl2 = i2 < TARRAY2_SIZE(fset2->lvlArr) ? TARRAY2_GET(fset2->lvlArr, i2) : NULL;
if (lvl1 && lvl2) {
if (lvl1->level < lvl2->level) {
// add a new stt level
code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
code = TARRAY2_SORT_INSERT(fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
if (code) return code;
i1++;
i2++;
} else if (lvl1->level > lvl2->level) {
// remove the stt level
TARRAY2_REMOVE(&fset2->lvlArr, i2, tsdbSttLvlRemove);
TARRAY2_REMOVE(fset2->lvlArr, i2, tsdbSttLvlRemove);
} else {
// apply edit on stt level
code = tsdbSttLvlApplyEdit(pTsdb, lvl1, lvl2);
@ -383,13 +383,13 @@ int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *f
// add a new stt level
code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl2);
if (code) return code;
code = TARRAY2_SORT_INSERT(&fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
code = TARRAY2_SORT_INSERT(fset2->lvlArr, lvl2, tsdbSttLvlCmprFn);
if (code) return code;
i1++;
i2++;
} else {
// remove the stt level
TARRAY2_REMOVE(&fset2->lvlArr, i2, tsdbSttLvlRemove);
TARRAY2_REMOVE(fset2->lvlArr, i2, tsdbSttLvlRemove);
}
}
@ -401,7 +401,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
if (fset[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
fset[0]->fid = fid;
TARRAY2_INIT(&fset[0]->lvlArr);
TARRAY2_INIT(fset[0]->lvlArr);
return 0;
}
@ -420,7 +420,7 @@ int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fse
}
const SSttLvl *lvl1;
TARRAY2_FOREACH(&fset1->lvlArr, lvl1) {
TARRAY2_FOREACH(fset1->lvlArr, lvl1) {
SSttLvl *lvl;
code = tsdbSttLvlInitEx(pTsdb, lvl1, &lvl);
if (code) {
@ -428,7 +428,7 @@ int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fse
return code;
}
code = TARRAY2_APPEND(&fset[0]->lvlArr, lvl);
code = TARRAY2_APPEND(fset[0]->lvlArr, lvl);
if (code) return code;
}
@ -443,7 +443,7 @@ int32_t tsdbTFileSetClear(STFileSet **fset) {
tsdbTFileObjUnref(fset[0]->farr[ftype]);
}
TARRAY2_CLEAR_FREE(&fset[0]->lvlArr, tsdbSttLvlClear);
TARRAY2_CLEAR_FREE(fset[0]->lvlArr, tsdbSttLvlClear);
taosMemoryFree(fset[0]);
fset[0] = NULL;
@ -457,7 +457,7 @@ int32_t tsdbTFileSetRemove(STFileSet **fset) {
tsdbTFileObjRemove(fset[0]->farr[ftype]);
}
TARRAY2_CLEAR_FREE(&fset[0]->lvlArr, tsdbSttLvlRemove);
TARRAY2_CLEAR_FREE(fset[0]->lvlArr, tsdbSttLvlRemove);
taosMemoryFree(fset[0]);
fset[0] = NULL;
return 0;
@ -466,7 +466,7 @@ int32_t tsdbTFileSetRemove(STFileSet **fset) {
SSttLvl *tsdbTFileSetGetLvl(STFileSet *fset, int32_t level) {
SSttLvl tlvl = {.level = level};
SSttLvl *lvl = &tlvl;
return TARRAY2_SEARCH_EX(&fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ);
return TARRAY2_SEARCH_EX(fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ);
}
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2) {
@ -483,8 +483,8 @@ int64_t tsdbTFileSetMaxCid(const STFileSet *fset) {
}
const SSttLvl *lvl;
const STFileObj *fobj;
TARRAY2_FOREACH(&fset->lvlArr, lvl) {
TARRAY2_FOREACH(&lvl->farr, fobj) { maxCid = TMAX(maxCid, fobj->f->cid); }
TARRAY2_FOREACH(fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) { maxCid = TMAX(maxCid, fobj->f->cid); }
}
return maxCid;
}
@ -493,5 +493,5 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset) {
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset->farr[ftype] != NULL) return false;
}
return TARRAY2_SIZE(&fset->lvlArr) == 0;
return TARRAY2_SIZE(fset->lvlArr) == 0;
}

View File

@ -16,40 +16,49 @@
#include "inc/tsdbMerge.h"
typedef struct {
STsdb *tsdb;
int32_t maxRow;
int32_t minRow;
int32_t szPage;
int8_t cmprAlg;
int64_t compactVersion;
int64_t cid;
SSkmInfo skmTb;
SSkmInfo skmRow;
uint8_t *aBuf[5];
STsdb *tsdb;
TFileSetArray fsetArr[1];
int32_t sttTrigger;
int32_t maxRow;
int32_t minRow;
int32_t szPage;
int8_t cmprAlg;
int64_t compactVersion;
int64_t cid;
SSkmInfo skmTb;
SSkmInfo skmRow;
uint8_t *aBuf[5];
// context
struct {
bool opened;
bool opened;
STFileSet *fset;
bool toData;
int32_t level;
STFileSet *fset;
SRowInfo *row;
SBlockData bData;
} ctx[1];
// reader
TARRAY2(SSttFileReader *) sttReaderArr[1];
SDataFileReader *dataReader;
TTsdbIterArray iterArr[1];
SIterMerger *iterMerger;
TSttFileReaderArray sttReaderArr[1];
SDataFileReader *dataReader;
TTsdbIterArray iterArr[1];
SIterMerger *iterMerger;
TFileOpArray fopArr[1];
// writer
SSttFileWriter *sttWriter;
SDataFileWriter *dataWriter;
// operations
TFileOpArray fopArr;
} SMerger;
static int32_t tsdbMergerOpen(SMerger *merger) {
merger->maxRow = merger->tsdb->pVnode->config.tsdbCfg.maxRows;
merger->minRow = merger->tsdb->pVnode->config.tsdbCfg.minRows;
merger->szPage = merger->tsdb->pVnode->config.tsdbPageSize;
merger->cmprAlg = merger->tsdb->pVnode->config.tsdbCfg.compression;
merger->compactVersion = INT64_MAX;
tsdbFSAllocEid(merger->tsdb->pFS, &merger->cid);
merger->ctx->opened = true;
TARRAY2_INIT(&merger->fopArr);
return 0;
}
@ -62,14 +71,14 @@ static int32_t tsdbMergerClose(SMerger *merger) {
STFileSystem *fs = merger->tsdb->pFS;
// edit file system
code = tsdbFSEditBegin(fs, &merger->fopArr, TSDB_FEDIT_MERGE);
code = tsdbFSEditBegin(fs, merger->fopArr, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSEditCommit(fs);
TSDB_CHECK_CODE(code, lino, _exit);
// clear the merge
TARRAY2_FREE(&merger->fopArr);
TARRAY2_FREE(merger->fopArr);
_exit:
if (code) {
@ -182,18 +191,18 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
merger->ctx->toData = true;
merger->ctx->level = 0;
TARRAY2_FOREACH(&fset->lvlArr, lvl) {
TARRAY2_FOREACH(fset->lvlArr, lvl) {
if (lvl->level != merger->ctx->level) {
lvl = NULL;
break;
}
fobj = TARRAY2_GET(&lvl->farr, 0);
fobj = TARRAY2_GET(lvl->fobjArr, 0);
if (fobj->f->stt->nseg < merger->tsdb->pVnode->config.sttTrigger) {
merger->ctx->toData = false;
break;
} else {
ASSERT(lvl->level == 0 || TARRAY2_SIZE(&lvl->farr) == 1);
ASSERT(lvl->level == 0 || TARRAY2_SIZE(lvl->fobjArr) == 1);
merger->ctx->level++;
// open the reader
@ -214,7 +223,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) {
.optype = TSDB_FOP_REMOVE,
.of = fobj->f[0],
};
code = TARRAY2_APPEND(&merger->fopArr, op);
code = TARRAY2_APPEND(merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -282,7 +291,7 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
TSDB_CHECK_CODE(code, lino, _exit);
if (op.optype != TSDB_FOP_NONE) {
code = TARRAY2_APPEND(&merger->fopArr, op);
code = TARRAY2_APPEND(merger->fopArr, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -301,7 +310,7 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) {
int32_t code = 0;
int32_t lino = 0;
if (merger->ctx->opened == false) {
if (!merger->ctx->opened) {
code = tsdbMergerOpen(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -332,43 +341,59 @@ _exit:
return 0;
}
int32_t tsdbMerge(STsdb *tsdb) {
static int32_t tsdbDoMerge(SMerger *merger) {
int32_t code = 0;
int32_t lino;
int32_t lino = 0;
int32_t vid = TD_VID(merger->tsdb->pVnode);
int32_t vid = TD_VID(tsdb->pVnode);
STFileSystem *fs = tsdb->pFS;
STFileSet *fset;
STFileObj *fobj;
int32_t sttTrigger = tsdb->pVnode->config.sttTrigger;
STFileSet *fset;
SSttLvl *lvl;
STFileObj *fobj;
TARRAY2_FOREACH(merger->fsetArr, fset) {
lvl = TARRAY2_SIZE(fset->lvlArr) ? TARRAY2_FIRST(fset->lvlArr) : NULL;
if (!lvl || lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) == 0) continue;
SMerger merger[1];
merger->tsdb = tsdb;
merger->ctx->opened = false;
fobj = TARRAY2_FIRST(lvl->fobjArr);
if (fobj->f->stt->nseg < merger->sttTrigger) continue;
// loop to merge each file set
TARRAY2_FOREACH(&fs->cstate, fset) {
SSttLvl *lvl0 = tsdbTFileSetGetLvl(fset, 0);
if (lvl0 == NULL) {
continue;
}
ASSERT(TARRAY2_SIZE(&lvl0->farr) > 0);
fobj = TARRAY2_GET(&lvl0->farr, 0);
if (fobj->f->stt->nseg >= sttTrigger) {
code = tsdbMergeFileSet(merger, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbMergeFileSet(merger, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
// end the merge
if (merger->ctx->opened) {
code = tsdbMergerClose(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);
} else {
tsdbDebug("vgId:%d %s done", vid, __func__);
}
return code;
}
int32_t tsdbMerge(STsdb *tsdb) {
int32_t code = 0;
int32_t lino = 0;
int32_t vid = TD_VID(tsdb->pVnode);
SMerger merger[1] = {{
.tsdb = tsdb,
.fsetArr = {TARRAY2_INITIALIZER},
.sttTrigger = tsdb->pVnode->config.sttTrigger,
}};
code = tsdbFSCopySnapshot(tsdb->pFS, merger->fsetArr);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDoMerge(merger);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbFSClearSnapshot(merger->fsetArr);
TARRAY2_FREE(merger->fsetArr);
_exit:
if (code) {
TSDB_ERROR_LOG(vid, lino, code);