more code
This commit is contained in:
parent
de1bdf97f9
commit
545d7a6ad6
|
@ -45,6 +45,7 @@ int32_t tsdbJsonToTFile(const cJSON *json, tsdb_ftype_t ftype, STFile *f);
|
||||||
int32_t tsdbTFileObjInit(const STFile *f, STFileObj **fobj);
|
int32_t tsdbTFileObjInit(const STFile *f, STFileObj **fobj);
|
||||||
int32_t tsdbTFileObjRef(STFileObj *fobj);
|
int32_t tsdbTFileObjRef(STFileObj *fobj);
|
||||||
int32_t tsdbTFileObjUnref(STFileObj *fobj);
|
int32_t tsdbTFileObjUnref(STFileObj *fobj);
|
||||||
|
int32_t tsdbTFileRemove(STFileObj *fobj);
|
||||||
|
|
||||||
struct STFile {
|
struct STFile {
|
||||||
tsdb_ftype_t type;
|
tsdb_ftype_t type;
|
||||||
|
@ -60,9 +61,16 @@ struct STFile {
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum {
|
||||||
|
TSDB_FSTATE_EXIST = 1,
|
||||||
|
TSDB_FSTATE_REMOVED,
|
||||||
|
};
|
||||||
|
|
||||||
struct STFileObj {
|
struct STFileObj {
|
||||||
|
TdThreadMutex mutex;
|
||||||
STFile f;
|
STFile f;
|
||||||
volatile int32_t ref;
|
int32_t state;
|
||||||
|
int32_t ref;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -240,44 +240,6 @@ static bool is_same_file(const STFile *f1, const STFile f2) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t apply_commit_add_fset(STFileSystem *fs, const STFileSet *fset) {
|
|
||||||
// int32_t idx = taosArraySearchIdx(fs->cstate, fset, (__compar_fn_t)tsdbFSetCmprFn, TD_GT);
|
|
||||||
// if (idx < 0) idx = taosArrayGetSize(fs->cstate);
|
|
||||||
|
|
||||||
// STFileSet *pFileSet = taosArrayInsert(fs->cstate, idx, fset);
|
|
||||||
// if (pFileSet == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
|
|
||||||
// int32_t code = tsdbFileSetInitEx(fset, pFileSet);
|
|
||||||
// if (code) return code;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
static int32_t apply_commit_del_fset(STFileSystem *fs, const STFileSet *fset) {
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
static int32_t apply_commit_upd_fset(STFileSystem *fs, STFileSet *fset_from, const STFileSet *fset_to) {
|
|
||||||
for (tsdb_ftype_t ftype = TSDB_FTYPE_HEAD; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
|
||||||
STFileObj *fobj_from = fset_from->farr[ftype];
|
|
||||||
STFileObj *fobj_to = fset_to->farr[ftype];
|
|
||||||
|
|
||||||
if (!fobj_from && !fobj_to) continue;
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
if (fobj_from && fobj_to) {
|
|
||||||
// TODO
|
|
||||||
} else if (fobj_from) {
|
|
||||||
// TODO
|
|
||||||
} else {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// TODO
|
|
||||||
ASSERT(0);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
static int32_t apply_commit(STFileSystem *fs) {
|
static int32_t apply_commit(STFileSystem *fs) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t i1 = 0, i2 = 0;
|
int32_t i1 = 0, i2 = 0;
|
||||||
|
@ -291,35 +253,37 @@ static int32_t apply_commit(STFileSystem *fs) {
|
||||||
if (fset1 && fset2) {
|
if (fset1 && fset2) {
|
||||||
if (fset1->fid < fset2->fid) {
|
if (fset1->fid < fset2->fid) {
|
||||||
// delete fset1
|
// delete fset1
|
||||||
code = apply_commit_del_fset(fs, fset1);
|
TARRAY2_REMOVE(&fs->cstate, i1, tsdbTFileSetClear);
|
||||||
if (code) return code;
|
n1 = TARRAY2_SIZE(&fs->cstate);
|
||||||
n1--;
|
|
||||||
} else if (fset1->fid > fset2->fid) {
|
} else if (fset1->fid > fset2->fid) {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = apply_commit_add_fset(fs, fset2);
|
code = tsdbTFileSetInitEx(fset2, &fset1);
|
||||||
|
if (code) return code;
|
||||||
|
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
i1++;
|
i1++;
|
||||||
n1++;
|
|
||||||
i2++;
|
i2++;
|
||||||
|
n1 = TARRAY2_SIZE(&fs->cstate);
|
||||||
} else {
|
} else {
|
||||||
// edit
|
// edit
|
||||||
code = apply_commit_upd_fset(fs, fset1, fset2);
|
code = tsdbTFileSetEditEx(fset2, fset1);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
i1++;
|
i1++;
|
||||||
i2++;
|
i2++;
|
||||||
}
|
}
|
||||||
} else if (fset1) {
|
} else if (fset1) {
|
||||||
// delete fset1
|
// delete fset1
|
||||||
code = apply_commit_del_fset(fs, fset1);
|
TARRAY2_REMOVE(&fs->cstate, i1, tsdbTFileSetClear);
|
||||||
if (code) return code;
|
n1 = TARRAY2_SIZE(&fs->cstate);
|
||||||
n1--;
|
|
||||||
} else {
|
} else {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = apply_commit_add_fset(fs, fset2);
|
code = tsdbTFileSetInitEx(fset2, &fset1);
|
||||||
|
if (code) return code;
|
||||||
|
code = TARRAY2_SORT_INSERT(&fs->cstate, fset1, tsdbTFileSetCmprFn);
|
||||||
if (code) return code;
|
if (code) return code;
|
||||||
i1++;
|
i1++;
|
||||||
n1++;
|
|
||||||
i2++;
|
i2++;
|
||||||
|
n1 = TARRAY2_SIZE(&fs->cstate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -209,6 +209,7 @@ int32_t tsdbTFileSetEdit(STFileSet *fset, const STFileOp *op) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset) {
|
int32_t tsdbTFileSetEditEx(const STFileSet *fset1, STFileSet *fset) {
|
||||||
|
ASSERT(fset1->fid == fset->fid);
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -229,22 +229,48 @@ int32_t tsdbTFileObjInit(const STFile *f, STFileObj **fobj) {
|
||||||
fobj[0] = taosMemoryMalloc(sizeof(*fobj[0]));
|
fobj[0] = taosMemoryMalloc(sizeof(*fobj[0]));
|
||||||
if (!fobj[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!fobj[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
taosThreadMutexInit(&fobj[0]->mutex, NULL);
|
||||||
fobj[0]->f = *f;
|
fobj[0]->f = *f;
|
||||||
|
fobj[0]->state = TSDB_FSTATE_EXIST;
|
||||||
fobj[0]->ref = 1;
|
fobj[0]->ref = 1;
|
||||||
// TODO: generate the file name
|
// TODO: generate the file name
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileObjRef(STFileObj *fobj) {
|
int32_t tsdbTFileObjRef(STFileObj *fobj) {
|
||||||
int32_t nRef = atomic_fetch_add_32(&fobj->ref, 1);
|
int32_t nRef;
|
||||||
|
taosThreadMutexLock(&fobj->mutex);
|
||||||
|
nRef = fobj->ref++;
|
||||||
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
ASSERT(nRef > 0);
|
ASSERT(nRef > 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
int32_t tsdbTFileObjUnref(STFileObj *fobj) {
|
||||||
int32_t nRef = atomic_sub_fetch_32(&fobj->ref, 1);
|
int32_t nRef;
|
||||||
|
taosThreadMutexLock(&fobj->mutex);
|
||||||
|
nRef = --fobj->ref;
|
||||||
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
|
|
||||||
ASSERT(nRef >= 0);
|
ASSERT(nRef >= 0);
|
||||||
|
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
|
if (fobj->state == TSDB_FSTATE_REMOVED) {
|
||||||
|
// TODO: add the file name
|
||||||
|
taosRemoveFile(fobj->fname);
|
||||||
|
}
|
||||||
|
taosMemoryFree(fobj);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tsdbTFileRemove(STFileObj *fobj) {
|
||||||
|
taosThreadMutexLock(&fobj->mutex);
|
||||||
|
fobj->state = TSDB_FSTATE_REMOVED;
|
||||||
|
int32_t nRef = --fobj->ref;
|
||||||
|
taosThreadMutexUnlock(&fobj->mutex);
|
||||||
|
if (nRef == 0) {
|
||||||
|
taosRemoveFile(fobj->fname);
|
||||||
taosMemoryFree(fobj);
|
taosMemoryFree(fobj);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue