more code
This commit is contained in:
parent
89eafa18cb
commit
ca28c99f80
|
@ -114,9 +114,18 @@ static FORCE_INLINE int32_t tarray2_make_room( //
|
||||||
#define TARRAY2_APPEND(a, e) TARRAY2_INSERT(a, (a)->size, e)
|
#define TARRAY2_APPEND(a, e) TARRAY2_INSERT(a, (a)->size, e)
|
||||||
#define TARRAY2_APPEND_P(a, ep) TARRAY2_APPEND(a, *(ep))
|
#define TARRAY2_APPEND_P(a, ep) TARRAY2_APPEND(a, *(ep))
|
||||||
|
|
||||||
|
// return (TYPE *)
|
||||||
#define TARRAY2_SEARCH(a, ep, cmp, flag) \
|
#define TARRAY2_SEARCH(a, ep, cmp, flag) \
|
||||||
(((a)->size == 0) ? NULL \
|
((typeof((a)->data))(((a)->size == 0) ? NULL \
|
||||||
: taosbsearch(ep, (a)->data, (a)->size, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag))
|
: taosbsearch(ep, (a)->data, (a)->size, sizeof(typeof((a)->data[0])), \
|
||||||
|
(__compar_fn_t)cmp, flag)))
|
||||||
|
|
||||||
|
// return (TYPE)
|
||||||
|
#define TARRAY2_SEARCH_EX(a, ep, cmp, flag) \
|
||||||
|
({ \
|
||||||
|
typeof((a)->data) __p = TARRAY2_SEARCH(a, ep, cmp, flag); \
|
||||||
|
__p ? __p[0] : NULL; \
|
||||||
|
})
|
||||||
|
|
||||||
#define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) \
|
#define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) \
|
||||||
({ \
|
({ \
|
||||||
|
|
|
@ -41,6 +41,7 @@ typedef enum {
|
||||||
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
|
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
|
||||||
int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
int32_t tsdbTFileSetInitEx(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
|
||||||
int32_t tsdbTFileSetClear(STFileSet **fset);
|
int32_t tsdbTFileSetClear(STFileSet **fset);
|
||||||
|
int32_t tsdbTFileSetRemove(STFileSet **fset);
|
||||||
// to/from json
|
// to/from json
|
||||||
int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json);
|
int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json);
|
||||||
int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
|
int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
|
||||||
|
|
|
@ -34,8 +34,8 @@ typedef enum {
|
||||||
} tsdb_ftype_t;
|
} tsdb_ftype_t;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TSDB_FSTATE_EXIST = 1,
|
TSDB_FSTATE_LIVE = 1,
|
||||||
TSDB_FSTATE_REMOVED,
|
TSDB_FSTATE_DEAD,
|
||||||
};
|
};
|
||||||
|
|
||||||
#define TSDB_FTYPE_MIN TSDB_FTYPE_HEAD
|
#define TSDB_FTYPE_MIN TSDB_FTYPE_HEAD
|
||||||
|
@ -50,7 +50,7 @@ int32_t tsdbTFileName(STsdb *pTsdb, const STFile *f, char fname[]);
|
||||||
int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj);
|
int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj);
|
||||||
int32_t tsdbTFileObjRef(STFileObj *fobj);
|
int32_t tsdbTFileObjRef(STFileObj *fobj);
|
||||||
int32_t tsdbTFileObjUnref(STFileObj *fobj);
|
int32_t tsdbTFileObjUnref(STFileObj *fobj);
|
||||||
int32_t tsdbTFileRemove(STFileObj *fobj);
|
int32_t tsdbTFileObjRemove(STFileObj *fobj);
|
||||||
int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2);
|
int32_t tsdbTFileObjCmpr(const STFileObj **fobj1, const STFileObj **fobj2);
|
||||||
|
|
||||||
struct STFile {
|
struct STFile {
|
||||||
|
|
|
@ -331,7 +331,7 @@ static int32_t open_committer(STsdb *pTsdb, SCommitInfo *pInfo, SCommitter *pCom
|
||||||
pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
|
pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
|
||||||
pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
|
pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
|
||||||
pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
|
pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
|
||||||
pCommitter->sttTrigger = 1; // TODO
|
pCommitter->sttTrigger = 4; // TODO
|
||||||
|
|
||||||
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
||||||
if (pCommitter->aTbDataP == NULL) {
|
if (pCommitter->aTbDataP == NULL) {
|
||||||
|
|
|
@ -252,8 +252,8 @@ static int32_t apply_commit(STFileSystem *fs) {
|
||||||
|
|
||||||
if (fset1 && fset2) {
|
if (fset1 && fset2) {
|
||||||
if (fset1->fid < fset2->fid) {
|
if (fset1->fid < fset2->fid) {
|
||||||
// delete fset1 (TODO: should set file remove)
|
// delete fset1
|
||||||
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetClear);
|
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
|
||||||
} else if (fset1->fid > fset2->fid) {
|
} else if (fset1->fid > fset2->fid) {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
|
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
|
||||||
|
@ -270,8 +270,8 @@ static int32_t apply_commit(STFileSystem *fs) {
|
||||||
i2++;
|
i2++;
|
||||||
}
|
}
|
||||||
} else if (fset1) {
|
} else if (fset1) {
|
||||||
// delete fset1 (TODO: should set file remove)
|
// delete fset1
|
||||||
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetClear);
|
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
|
||||||
} else {
|
} else {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
|
code = tsdbTFileSetInitEx(fs->pTsdb, fset2, &fset1);
|
||||||
|
@ -481,7 +481,7 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
|
||||||
if (!fset || fset->fid != op->fid) {
|
if (!fset || fset->fid != op->fid) {
|
||||||
STFileSet tfset = {.fid = op->fid};
|
STFileSet tfset = {.fid = op->fid};
|
||||||
fset = &tfset;
|
fset = &tfset;
|
||||||
fset = TARRAY2_SEARCH(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
|
fset = TARRAY2_SEARCH_EX(fsetArray, &fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||||
|
|
||||||
if (!fset) {
|
if (!fset) {
|
||||||
code = tsdbTFileSetInit(op->fid, &fset);
|
code = tsdbTFileSetInit(op->fid, &fset);
|
||||||
|
@ -589,6 +589,6 @@ int32_t tsdbFSEditAbort(STFileSystem *fs) {
|
||||||
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **fset) {
|
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, const STFileSet **fset) {
|
||||||
STFileSet tfset = {.fid = fid};
|
STFileSet tfset = {.fid = fid};
|
||||||
fset[0] = &tfset;
|
fset[0] = &tfset;
|
||||||
fset[0] = TARRAY2_SEARCH(&fs->cstate, fset, tsdbTFileSetCmprFn, TD_EQ);
|
fset[0] = TARRAY2_SEARCH_EX(&fs->cstate, fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -21,6 +21,7 @@ static int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl) {
|
||||||
TARRAY2_INIT(&lvl[0]->farr);
|
TARRAY2_INIT(&lvl[0]->farr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbSttLvlClearFObj(void *data) { tsdbTFileObjUnref(*(STFileObj **)data); }
|
static void tsdbSttLvlClearFObj(void *data) { tsdbTFileObjUnref(*(STFileObj **)data); }
|
||||||
static int32_t tsdbSttLvlClear(SSttLvl **lvl) {
|
static int32_t tsdbSttLvlClear(SSttLvl **lvl) {
|
||||||
TARRAY2_CLEAR_FREE(&lvl[0]->farr, tsdbSttLvlClearFObj);
|
TARRAY2_CLEAR_FREE(&lvl[0]->farr, tsdbSttLvlClearFObj);
|
||||||
|
@ -28,6 +29,7 @@ static int32_t tsdbSttLvlClear(SSttLvl **lvl) {
|
||||||
lvl[0] = NULL;
|
lvl[0] = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl) {
|
static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl) {
|
||||||
int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
|
int32_t code = tsdbSttLvlInit(lvl1->level, lvl);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
|
@ -46,6 +48,13 @@ static int32_t tsdbSttLvlInitEx(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl **lvl
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tsdbSttLvlRemoveFObj(void *data) { tsdbTFileObjRemove(*(STFileObj **)data); }
|
||||||
|
static void tsdbSttLvlRemove(SSttLvl **lvl) {
|
||||||
|
TARRAY2_CLEAR_FREE(&lvl[0]->farr, tsdbSttLvlRemoveFObj);
|
||||||
|
taosMemoryFree(lvl[0]);
|
||||||
|
lvl[0] = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbSttLvlCmprFn(const SSttLvl **lvl1, const SSttLvl **lvl2) {
|
static int32_t tsdbSttLvlCmprFn(const SSttLvl **lvl1, const SSttLvl **lvl2) {
|
||||||
if (lvl1[0]->level < lvl2[0]->level) return -1;
|
if (lvl1[0]->level < lvl2[0]->level) return -1;
|
||||||
if (lvl1[0]->level > lvl2[0]->level) return 1;
|
if (lvl1[0]->level > lvl2[0]->level) return 1;
|
||||||
|
@ -291,10 +300,22 @@ int32_t tsdbTFileSetClear(STFileSet **fset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbTFileSetRemove(STFileSet **fset) {
|
||||||
|
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||||
|
if (fset[0]->farr[ftype] == NULL) continue;
|
||||||
|
tsdbTFileObjRemove(fset[0]->farr[ftype]);
|
||||||
|
}
|
||||||
|
|
||||||
|
TARRAY2_CLEAR_FREE(&fset[0]->lvlArr, tsdbSttLvlRemove);
|
||||||
|
taosMemoryFree(fset[0]);
|
||||||
|
fset[0] = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) {
|
SSttLvl *tsdbTFileSetGetLvl(const STFileSet *fset, int32_t level) {
|
||||||
SSttLvl tlvl = {.level = level};
|
SSttLvl tlvl = {.level = level};
|
||||||
SSttLvl *lvl = &tlvl;
|
SSttLvl *lvl = &tlvl;
|
||||||
return TARRAY2_SEARCH(&fset->lvlArr, &lvl, tsdbSttLvlCmprFn, TD_EQ);
|
return TARRAY2_SEARCH_EX(&fset->lvlArr, lvl, tsdbSttLvlCmprFn, TD_EQ);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2) {
|
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2) {
|
||||||
|
|
|
@ -41,6 +41,11 @@ static const struct {
|
||||||
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static void remove_file(const char *fname) {
|
||||||
|
taosRemoveFile(fname);
|
||||||
|
tsdbInfo("file:%s is removed", fname);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tfile_to_json(const STFile *file, cJSON *json) {
|
static int32_t tfile_to_json(const STFile *file, cJSON *json) {
|
||||||
/* did.level */
|
/* did.level */
|
||||||
if (cJSON_AddNumberToObject(json, "did.level", file->did.level) == NULL) {
|
if (cJSON_AddNumberToObject(json, "did.level", file->did.level) == NULL) {
|
||||||
|
@ -201,7 +206,7 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
|
||||||
|
|
||||||
taosThreadMutexInit(&fobj[0]->mutex, NULL);
|
taosThreadMutexInit(&fobj[0]->mutex, NULL);
|
||||||
fobj[0]->f = *f;
|
fobj[0]->f = *f;
|
||||||
fobj[0]->state = TSDB_FSTATE_EXIST;
|
fobj[0]->state = TSDB_FSTATE_LIVE;
|
||||||
fobj[0]->ref = 1;
|
fobj[0]->ref = 1;
|
||||||
tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
tsdbTFileName(pTsdb, f, fobj[0]->fname);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -210,23 +215,20 @@ int32_t tsdbTFileObjInit(STsdb *pTsdb, const STFile *f, STFileObj **fobj) {
|
||||||
int32_t tsdbTFileObjRef(STFileObj *fobj) {
|
int32_t tsdbTFileObjRef(STFileObj *fobj) {
|
||||||
int32_t nRef;
|
int32_t nRef;
|
||||||
taosThreadMutexLock(&fobj->mutex);
|
taosThreadMutexLock(&fobj->mutex);
|
||||||
|
ASSERT(fobj->ref > 0 && fobj->state == TSDB_FSTATE_LIVE);
|
||||||
nRef = fobj->ref++;
|
nRef = fobj->ref++;
|
||||||
taosThreadMutexUnlock(&fobj->mutex);
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
ASSERT(nRef > 0);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
||||||
int32_t nRef;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&fobj->mutex);
|
taosThreadMutexLock(&fobj->mutex);
|
||||||
nRef = --fobj->ref;
|
int32_t nRef = --fobj->ref;
|
||||||
taosThreadMutexUnlock(&fobj->mutex);
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
ASSERT(nRef >= 0);
|
ASSERT(nRef >= 0);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
if (fobj->state == TSDB_FSTATE_REMOVED) {
|
if (fobj->state == TSDB_FSTATE_DEAD) {
|
||||||
// TODO: add the file name
|
remove_file(fobj->fname);
|
||||||
taosRemoveFile(fobj->fname);
|
|
||||||
}
|
}
|
||||||
taosMemoryFree(fobj);
|
taosMemoryFree(fobj);
|
||||||
}
|
}
|
||||||
|
@ -234,13 +236,14 @@ int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileRemove(STFileObj *fobj) {
|
int32_t tsdbTFileObjRemove(STFileObj *fobj) {
|
||||||
taosThreadMutexLock(&fobj->mutex);
|
taosThreadMutexLock(&fobj->mutex);
|
||||||
fobj->state = TSDB_FSTATE_REMOVED;
|
ASSERT(fobj->state == TSDB_FSTATE_LIVE && fobj->ref > 0);
|
||||||
|
fobj->state = TSDB_FSTATE_DEAD;
|
||||||
int32_t nRef = --fobj->ref;
|
int32_t nRef = --fobj->ref;
|
||||||
taosThreadMutexUnlock(&fobj->mutex);
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
taosRemoveFile(fobj->fname);
|
remove_file(fobj->fname);
|
||||||
taosMemoryFree(fobj);
|
taosMemoryFree(fobj);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue