more code
This commit is contained in:
parent
8f058c8571
commit
c9dc9b3170
|
@ -13,11 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
#ifndef _TD_TSDB_DEF_H_
|
#ifndef _TD_TSDB_DEF_H_
|
||||||
#define _TD_TSDB_DEF_H_
|
#define _TD_TSDB_DEF_H_
|
||||||
|
|
||||||
#include "tsdb.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -13,11 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tsdbFSet.h"
|
||||||
|
|
||||||
#ifndef _TSDB_FILE_SYSTEM_H
|
#ifndef _TSDB_FILE_SYSTEM_H
|
||||||
#define _TSDB_FILE_SYSTEM_H
|
#define _TSDB_FILE_SYSTEM_H
|
||||||
|
|
||||||
#include "tsdbFSet.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -26,10 +26,8 @@ extern "C" {
|
||||||
typedef struct STFileSystem STFileSystem;
|
typedef struct STFileSystem STFileSystem;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FS_EDIT_NONE = 0,
|
TSDB_FS_EDIT_COMMIT = 1, //
|
||||||
TSDB_FS_EDIT_COMMIT,
|
TSDB_FS_EDIT_MERGE
|
||||||
TSDB_FS_EDIT_MERGE,
|
|
||||||
TSDB_FS_EDIT_MAX,
|
|
||||||
} tsdb_fs_edit_t;
|
} tsdb_fs_edit_t;
|
||||||
|
|
||||||
/* Exposed APIs */
|
/* Exposed APIs */
|
||||||
|
@ -47,8 +45,8 @@ struct STFileSystem {
|
||||||
int32_t state;
|
int32_t state;
|
||||||
tsem_t canEdit;
|
tsem_t canEdit;
|
||||||
int64_t nextEditId;
|
int64_t nextEditId;
|
||||||
SArray *aFileSet; // SArray<struct SFileSet>
|
SArray *cstate; // current state
|
||||||
SArray *nState; // SArray<struct SFileSet>
|
SArray *nstate; // next state
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -13,18 +13,17 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tsdbFile.h"
|
||||||
|
|
||||||
#ifndef _TSDB_FILE_SET_H
|
#ifndef _TSDB_FILE_SET_H
|
||||||
#define _TSDB_FILE_SET_H
|
#define _TSDB_FILE_SET_H
|
||||||
|
|
||||||
#include "tsdbFile.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct SFileSet SFileSet;
|
typedef struct STFileSet STFileSet;
|
||||||
typedef struct SFileOp SFileOp;
|
typedef struct SFileOp SFileOp;
|
||||||
typedef struct SSttLvl SSttLvl;
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FOP_NONE = 0,
|
TSDB_FOP_NONE = 0,
|
||||||
|
@ -34,10 +33,10 @@ typedef enum {
|
||||||
TSDB_FOP_TRUNCATE,
|
TSDB_FOP_TRUNCATE,
|
||||||
} tsdb_fop_t;
|
} tsdb_fop_t;
|
||||||
|
|
||||||
int32_t tsdbFileSetCreate(int32_t fid, SFileSet **ppSet);
|
int32_t tsdbFileSetCreate(int32_t fid, STFileSet **ppSet);
|
||||||
int32_t tsdbFileSetEdit(SFileSet *pSet, SFileOp *pOp);
|
int32_t tsdbFileSetEdit(STFileSet *pSet, SFileOp *pOp);
|
||||||
int32_t tsdbFileSetToJson(SJson *pJson, const SFileSet *pSet);
|
int32_t tsdbFileSetToJson(SJson *pJson, const STFileSet *pSet);
|
||||||
int32_t tsdbEditFileSet(SFileSet *pFileSet, const SFileOp *pOp);
|
int32_t tsdbEditFileSet(STFileSet *pFileSet, const SFileOp *pOp);
|
||||||
|
|
||||||
struct SFileOp {
|
struct SFileOp {
|
||||||
tsdb_fop_t op;
|
tsdb_fop_t op;
|
||||||
|
@ -46,14 +45,14 @@ struct SFileOp {
|
||||||
STFile nState; // new file state
|
STFile nState; // new file state
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SSttLvl {
|
typedef struct SSttLvl {
|
||||||
int32_t level;
|
LISTD(struct SSttLvl) listNode;
|
||||||
int32_t nStt;
|
int32_t lvl; // level
|
||||||
STFile *fSttList;
|
int32_t nStt; // number of .stt files on this level
|
||||||
LISTD(SSttLvl) listNode;
|
STFile *fStt; // .stt files
|
||||||
};
|
} SSttLvl;
|
||||||
|
|
||||||
struct SFileSet {
|
struct STFileSet {
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
int64_t nextid;
|
int64_t nextid;
|
||||||
STFile *farr[TSDB_FTYPE_MAX]; // file array
|
STFile *farr[TSDB_FTYPE_MAX]; // file array
|
||||||
|
|
|
@ -13,11 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tsdbDef.h"
|
||||||
|
|
||||||
#ifndef _TSDB_FILE_H
|
#ifndef _TSDB_FILE_H
|
||||||
#define _TSDB_FILE_H
|
#define _TSDB_FILE_H
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -33,12 +33,12 @@ typedef enum {
|
||||||
TSDB_FTYPE_STT, // .stt
|
TSDB_FTYPE_STT, // .stt
|
||||||
} tsdb_ftype_t;
|
} tsdb_ftype_t;
|
||||||
|
|
||||||
int32_t tsdbTFileCreate(const struct STFile *config, struct STFile **ppFile);
|
int32_t tsdbTFileInit(STsdb *pTsdb, STFile *pFile);
|
||||||
int32_t tsdbTFileDestroy(struct STFile *pFile);
|
int32_t tsdbTFileClear(STFile *pFile);
|
||||||
int32_t tsdbTFileInit(STsdb *pTsdb, struct STFile *pFile);
|
|
||||||
int32_t tsdbTFileClear(struct STFile *pFile);
|
|
||||||
|
|
||||||
struct STFile {
|
struct STFile {
|
||||||
|
LISTD(STFile) listNode;
|
||||||
|
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
int32_t ref;
|
int32_t ref;
|
||||||
int32_t state;
|
int32_t state;
|
||||||
|
@ -53,8 +53,6 @@ struct STFile {
|
||||||
int32_t nSeg;
|
int32_t nSeg;
|
||||||
} stt;
|
} stt;
|
||||||
};
|
};
|
||||||
|
|
||||||
LISTD(STFile) listNode;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -27,12 +27,12 @@ typedef struct {
|
||||||
int8_t sttTrigger;
|
int8_t sttTrigger;
|
||||||
SArray *aTbDataP;
|
SArray *aTbDataP;
|
||||||
// context
|
// context
|
||||||
TSKEY nextKey;
|
TSKEY nextKey;
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
int32_t expLevel;
|
int32_t expLevel;
|
||||||
TSKEY minKey;
|
TSKEY minKey;
|
||||||
TSKEY maxKey;
|
TSKEY maxKey;
|
||||||
SFileSet *pFileSet;
|
STFileSet *pFileSet;
|
||||||
// writer
|
// writer
|
||||||
SArray *aFileOp;
|
SArray *aFileOp;
|
||||||
SSttFileWriter *pWriter;
|
SSttFileWriter *pWriter;
|
||||||
|
|
|
@ -13,30 +13,45 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "dev.h"
|
#include "inc/tsdbFS.h"
|
||||||
|
|
||||||
static int32_t create_file_system(STsdb *pTsdb, struct STFileSystem **ppFS) {
|
#define TSDB_FS_EDIT_MIN TSDB_FS_EDIT_COMMIT
|
||||||
if ((ppFS[0] = taosMemoryCalloc(1, sizeof(*ppFS[0]))) == NULL) {
|
#define TSDB_FS_EDIT_MAX (TSDB_FS_EDIT_MERGE + 1)
|
||||||
|
|
||||||
|
enum {
|
||||||
|
TSDB_FS_STATE_NONE = 0,
|
||||||
|
TSDB_FS_STATE_OPEN,
|
||||||
|
TSDB_FS_STATE_EDIT,
|
||||||
|
TSDB_FS_STATE_CLOSE,
|
||||||
|
};
|
||||||
|
|
||||||
|
static int32_t create_file_system(STsdb *pTsdb, STFileSystem **ppFS) {
|
||||||
|
ppFS[0] = taosMemoryCalloc(1, sizeof(*ppFS[0]));
|
||||||
|
if (ppFS[0] == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((ppFS[0]->aFileSet = taosArrayInit(16, sizeof(struct SFileSet))) == NULL ||
|
ppFS[0]->cstate = taosArrayInit(16, sizeof(STFileSet));
|
||||||
(ppFS[0]->nState = taosArrayInit(16, sizeof(struct SFileSet))) == NULL) {
|
ppFS[0]->nstate = taosArrayInit(16, sizeof(STFileSet));
|
||||||
taosArrayDestroy(ppFS[0]->nState);
|
if (ppFS[0]->cstate == NULL || ppFS[0]->nstate == NULL) {
|
||||||
taosArrayDestroy(ppFS[0]->aFileSet);
|
taosArrayDestroy(ppFS[0]->nstate);
|
||||||
|
taosArrayDestroy(ppFS[0]->cstate);
|
||||||
taosMemoryFree(ppFS[0]);
|
taosMemoryFree(ppFS[0]);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
ppFS[0]->pTsdb = pTsdb;
|
ppFS[0]->pTsdb = pTsdb;
|
||||||
|
ppFS[0]->state = TSDB_FS_STATE_NONE;
|
||||||
tsem_init(&ppFS[0]->canEdit, 0, 1);
|
tsem_init(&ppFS[0]->canEdit, 0, 1);
|
||||||
|
ppFS[0]->nextEditId = 0;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t destroy_file_system(struct STFileSystem **ppFS) {
|
static int32_t destroy_file_system(STFileSystem **ppFS) {
|
||||||
if (ppFS[0]) {
|
if (ppFS[0]) {
|
||||||
taosArrayDestroy(ppFS[0]->aFileSet);
|
taosArrayDestroy(ppFS[0]->nstate);
|
||||||
|
taosArrayDestroy(ppFS[0]->cstate);
|
||||||
tsem_destroy(&ppFS[0]->canEdit);
|
tsem_destroy(&ppFS[0]->canEdit);
|
||||||
taosMemoryFree(ppFS[0]);
|
taosMemoryFree(ppFS[0]);
|
||||||
ppFS[0] = NULL;
|
ppFS[0] = NULL;
|
||||||
|
@ -110,7 +125,7 @@ static int32_t get_current_temp(STsdb *pTsdb, char fname[], tsdb_fs_edit_t etype
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) {
|
static int32_t fs_to_json_str(STFileSystem *pFS, char **ppData) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
|
||||||
|
@ -148,8 +163,8 @@ static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) {
|
||||||
_exit, //
|
_exit, //
|
||||||
TSDB_CODE_OUT_OF_MEMORY);
|
TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pFS->nState); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pFS->nstate); i++) {
|
||||||
struct SFileSet *pFileSet = taosArrayGet(pFS->nState, i);
|
struct STFileSet *pFileSet = taosArrayGet(pFS->nstate, i);
|
||||||
|
|
||||||
code = tsdbFileSetToJson(aFileSetJson, pFileSet);
|
code = tsdbFileSetToJson(aFileSetJson, pFileSet);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -173,7 +188,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t fs_from_json_str(const char *pData, struct STFileSystem *pFS) {
|
static int32_t fs_from_json_str(const char *pData, STFileSystem *pFS) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
|
||||||
|
@ -183,7 +198,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t save_fs_to_file(struct STFileSystem *pFS, const char *fname) {
|
static int32_t save_fs_to_file(STFileSystem *pFS, const char *fname) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
char *pData = NULL;
|
char *pData = NULL;
|
||||||
|
@ -234,12 +249,12 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t load_fs_from_file(const char *fname, struct STFileSystem *pFS) {
|
static int32_t load_fs_from_file(const char *fname, STFileSystem *pFS) {
|
||||||
ASSERTS(0, "TODO: Not implemented yet");
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t commit_edit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
static int32_t commit_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
char ofname[TSDB_FILENAME_LEN];
|
char ofname[TSDB_FILENAME_LEN];
|
||||||
char nfname[TSDB_FILENAME_LEN];
|
char nfname[TSDB_FILENAME_LEN];
|
||||||
|
@ -258,7 +273,7 @@ static int32_t commit_edit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t abort_edit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
static int32_t abort_edit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
@ -270,23 +285,24 @@ static int32_t abort_edit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t scan_file_system(struct STFileSystem *pFS) {
|
static int32_t scan_file_system(STFileSystem *pFS) {
|
||||||
// ASSERTS(0, "TODO: Not implemented yet");
|
// ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t scan_and_schedule_merge(struct STFileSystem *pFS) {
|
static int32_t scan_and_schedule_merge(STFileSystem *pFS) {
|
||||||
// ASSERTS(0, "TODO: Not implemented yet");
|
// ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t open_file_system(struct STFileSystem *pFS, int8_t rollback) {
|
static int32_t open_file_system(STFileSystem *pFS, int8_t rollback) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino = 0;
|
||||||
STsdb *pTsdb = pFS->pTsdb;
|
STsdb *pTsdb = pFS->pTsdb;
|
||||||
|
|
||||||
if (0) {
|
bool update = false; // TODO
|
||||||
ASSERTS(0, "Not implemented yet");
|
if (update) {
|
||||||
|
// TODO
|
||||||
} else {
|
} else {
|
||||||
char current_json[TSDB_FILENAME_LEN];
|
char current_json[TSDB_FILENAME_LEN];
|
||||||
char current_json_commit[TSDB_FILENAME_LEN];
|
char current_json_commit[TSDB_FILENAME_LEN];
|
||||||
|
@ -330,31 +346,25 @@ static int32_t open_file_system(struct STFileSystem *pFS, int8_t rollback) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", //
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
TD_VID(pTsdb->pVnode), //
|
|
||||||
__func__, //
|
|
||||||
lino, //
|
|
||||||
tstrerror(code));
|
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d %s success", //
|
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
||||||
TD_VID(pTsdb->pVnode), //
|
|
||||||
__func__);
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t close_file_system(struct STFileSystem *pFS) {
|
static int32_t close_file_system(STFileSystem *pFS) {
|
||||||
ASSERTS(0, "TODO: Not implemented yet");
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t apply_edit(struct STFileSystem *pFS) {
|
static int32_t apply_edit(STFileSystem *pFS) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
ASSERTS(0, "TODO: Not implemented yet");
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t fset_cmpr_fn(const struct SFileSet *pSet1, const struct SFileSet *pSet2) {
|
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
|
||||||
if (pSet1->fid < pSet2->fid) {
|
if (pSet1->fid < pSet2->fid) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (pSet1->fid > pSet2->fid) {
|
} else if (pSet1->fid > pSet2->fid) {
|
||||||
|
@ -363,31 +373,31 @@ static int32_t fset_cmpr_fn(const struct SFileSet *pSet1, const struct SFileSet
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t edit_fs(struct STFileSystem *pFS, const SArray *aFileOp) {
|
static int32_t edit_fs(STFileSystem *pFS, const SArray *aFileOp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
|
||||||
taosArrayClearEx(pFS->nState, NULL /* TODO */);
|
taosArrayClearEx(pFS->nstate, NULL /* TODO */);
|
||||||
|
|
||||||
// TODO: copy current state to new state
|
// TODO: copy current state to new state
|
||||||
|
|
||||||
for (int32_t iop = 0; iop < taosArrayGetSize(aFileOp); iop++) {
|
for (int32_t iop = 0; iop < taosArrayGetSize(aFileOp); iop++) {
|
||||||
struct SFileOp *pOp = taosArrayGet(aFileOp, iop);
|
struct SFileOp *pOp = taosArrayGet(aFileOp, iop);
|
||||||
|
|
||||||
struct SFileSet tmpSet = {.fid = pOp->fid};
|
struct STFileSet tmpSet = {.fid = pOp->fid};
|
||||||
|
|
||||||
int32_t idx = taosArraySearchIdx( //
|
int32_t idx = taosArraySearchIdx( //
|
||||||
pFS->nState, //
|
pFS->nstate, //
|
||||||
&tmpSet, //
|
&tmpSet, //
|
||||||
(__compar_fn_t)fset_cmpr_fn, //
|
(__compar_fn_t)fset_cmpr_fn, //
|
||||||
TD_GE);
|
TD_GE);
|
||||||
|
|
||||||
struct SFileSet *pSet;
|
struct STFileSet *pSet;
|
||||||
if (idx < 0) {
|
if (idx < 0) {
|
||||||
pSet = NULL;
|
pSet = NULL;
|
||||||
idx = taosArrayGetSize(pFS->nState);
|
idx = taosArrayGetSize(pFS->nstate);
|
||||||
} else {
|
} else {
|
||||||
pSet = taosArrayGet(pFS->nState, idx);
|
pSet = taosArrayGet(pFS->nstate, idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSet == NULL || pSet->fid != pOp->fid) {
|
if (pSet == NULL || pSet->fid != pOp->fid) {
|
||||||
|
@ -397,7 +407,7 @@ static int32_t edit_fs(struct STFileSystem *pFS, const SArray *aFileOp) {
|
||||||
lino, //
|
lino, //
|
||||||
_exit);
|
_exit);
|
||||||
|
|
||||||
if (taosArrayInsert(pFS->nState, idx, pSet) == NULL) {
|
if (taosArrayInsert(pFS->nstate, idx, pSet) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
@ -416,7 +426,7 @@ _exit:
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbOpenFileSystem(STsdb *pTsdb, struct STFileSystem **ppFS, int8_t rollback) {
|
int32_t tsdbOpenFileSystem(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
|
||||||
|
@ -424,34 +434,26 @@ int32_t tsdbOpenFileSystem(STsdb *pTsdb, struct STFileSystem **ppFS, int8_t roll
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = open_file_system(ppFS[0], rollback);
|
code = open_file_system(ppFS[0], rollback);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _exit)
|
||||||
destroy_file_system(ppFS);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", //
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||||
TD_VID(pTsdb->pVnode), //
|
destroy_file_system(ppFS);
|
||||||
__func__, //
|
|
||||||
lino, //
|
|
||||||
tstrerror(code));
|
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d %s success", //
|
tsdbInfo("vgId:%d %s success", TD_VID(pTsdb->pVnode), __func__);
|
||||||
TD_VID(pTsdb->pVnode), //
|
|
||||||
__func__);
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbCloseFileSystem(struct STFileSystem **ppFS) {
|
int32_t tsdbCloseFileSystem(STFileSystem **ppFS) {
|
||||||
if (ppFS[0] == NULL) return 0;
|
if (ppFS[0] == NULL) return 0;
|
||||||
close_file_system(ppFS[0]);
|
close_file_system(ppFS[0]);
|
||||||
destroy_file_system(ppFS);
|
destroy_file_system(ppFS);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFileSystemEditBegin(struct STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype) {
|
int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
@ -486,7 +488,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFileSystemEditCommit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
||||||
int32_t code = commit_edit(pFS, etype);
|
int32_t code = commit_edit(pFS, etype);
|
||||||
tsem_post(&pFS->canEdit);
|
tsem_post(&pFS->canEdit);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -503,7 +505,7 @@ int32_t tsdbFileSystemEditCommit(struct STFileSystem *pFS, tsdb_fs_edit_t etype)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFileSystemEditAbort(struct STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
int32_t tsdbFileSystemEditAbort(STFileSystem *pFS, tsdb_fs_edit_t etype) {
|
||||||
int32_t code = abort_edit(pFS, etype);
|
int32_t code = abort_edit(pFS, etype);
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed since %s, etype:%d", //
|
tsdbError("vgId:%d %s failed since %s, etype:%d", //
|
||||||
|
|
|
@ -13,12 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "dev.h"
|
#include "inc/tsdbFSet.h"
|
||||||
|
|
||||||
int32_t tsdbFileSetCreate(int32_t fid, struct SFileSet **ppSet) {
|
int32_t tsdbFileSetCreate(int32_t fid, struct STFileSet **ppSet) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
ppSet[0] = taosMemoryCalloc(1, sizeof(struct SFileSet));
|
ppSet[0] = taosMemoryCalloc(1, sizeof(struct STFileSet));
|
||||||
if (ppSet[0] == NULL) {
|
if (ppSet[0] == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -30,59 +30,13 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFileSetEdit(struct SFileSet *pSet, struct SFileOp *pOp) {
|
int32_t tsdbFileSetEdit(struct STFileSet *pSet, struct SFileOp *pOp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
// TODO
|
||||||
|
|
||||||
// switch (pOp->op) {
|
|
||||||
// case TSDB_FOP_CREATE: {
|
|
||||||
// struct STFile **ppFile;
|
|
||||||
// switch (pOp->nState.type) {
|
|
||||||
// case TSDB_FTYPE_HEAD: {
|
|
||||||
// ppFile = &pSet->fHead;
|
|
||||||
// } break;
|
|
||||||
// case TSDB_FTYPE_DATA: {
|
|
||||||
// ppFile = &pSet->fData;
|
|
||||||
// } break;
|
|
||||||
// case TSDB_FTYPE_SMA: {
|
|
||||||
// ppFile = &pSet->fSma;
|
|
||||||
// } break;
|
|
||||||
// case TSDB_FTYPE_TOMB: {
|
|
||||||
// ppFile = &pSet->fTomb;
|
|
||||||
// } break;
|
|
||||||
// case TSDB_FTYPE_STT: {
|
|
||||||
// // ppFile = &pSet->lStt[0].fStt;
|
|
||||||
// } break;
|
|
||||||
// default: {
|
|
||||||
// ASSERTS(0, "Invalid file type");
|
|
||||||
// } break;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// TSDB_CHECK_CODE( //
|
|
||||||
// code = tsdbTFileCreate(&pOp->nState, ppFile), //
|
|
||||||
// lino, //
|
|
||||||
// _exit);
|
|
||||||
// } break;
|
|
||||||
|
|
||||||
// case TSDB_FOP_DELETE: {
|
|
||||||
// ASSERTS(0, "TODO: Not implemented yet");
|
|
||||||
// } break;
|
|
||||||
// case TSDB_FOP_TRUNCATE: {
|
|
||||||
// ASSERTS(0, "TODO: Not implemented yet");
|
|
||||||
// } break;
|
|
||||||
// case TSDB_FOP_EXTEND: {
|
|
||||||
// ASSERTS(0, "TODO: Not implemented yet");
|
|
||||||
// } break;
|
|
||||||
// default: {
|
|
||||||
// ASSERTS(0, "Invalid file operation");
|
|
||||||
// } break;
|
|
||||||
// }
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFileSetToJson(SJson *pJson, const struct SFileSet *pSet) {
|
int32_t tsdbFileSetToJson(SJson *pJson, const struct STFileSet *pSet) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
ASSERTS(0, "TODO: Not implemented yet");
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
|
@ -91,7 +45,7 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbEditFileSet(struct SFileSet *pFileSet, const struct SFileOp *pOp) {
|
int32_t tsdbEditFileSet(struct STFileSet *pFileSet, const struct SFileOp *pOp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
ASSERTS(0, "TODO: Not implemented yet");
|
ASSERTS(0, "TODO: Not implemented yet");
|
||||||
// TODO
|
// TODO
|
||||||
|
|
|
@ -13,40 +13,18 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "dev.h"
|
#include "inc/tsdbFile.h"
|
||||||
|
|
||||||
const char *tsdb_ftype_suffix[] = {
|
const char *tsdb_ftype_suffix[] = {
|
||||||
".head", // TSDB_FTYPE_HEAD
|
[TSDB_FTYPE_HEAD] = ".head", //
|
||||||
".data", // TSDB_FTYPE_DATA
|
[TSDB_FTYPE_DATA] = ".data", //
|
||||||
".sma", // TSDB_FTYPE_SMA
|
[TSDB_FTYPE_SMA] = ".sma", //
|
||||||
".tomb", // TSDB_FTYPE_TOMB
|
[TSDB_FTYPE_TOMB] = ".tomb", //
|
||||||
NULL, // TSDB_FTYPE_MAX
|
[TSDB_FTYPE_MAX] = NULL, //
|
||||||
".stt", // TSDB_FTYPE_STT
|
[TSDB_FTYPE_STT] = ".stt",
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tsdbTFileCreate(const struct STFile *config, struct STFile **ppFile) {
|
int32_t tsdbTFileInit(STsdb *pTsdb, STFile *pFile) {
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino;
|
|
||||||
|
|
||||||
ppFile[0] = (struct STFile *)taosMemoryCalloc(1, sizeof(struct STFile));
|
|
||||||
if (ppFile[0] == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
ppFile[0][0] = config[0];
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbTFileDestroy(struct STFile *pFile) {
|
|
||||||
int32_t code = 0;
|
|
||||||
// TODO
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbTFileInit(STsdb *pTsdb, struct STFile *pFile) {
|
|
||||||
SVnode *pVnode = pTsdb->pVnode;
|
SVnode *pVnode = pTsdb->pVnode;
|
||||||
STfs *pTfs = pVnode->pTfs;
|
STfs *pTfs = pVnode->pTfs;
|
||||||
|
|
||||||
|
@ -77,4 +55,7 @@ int32_t tsdbTFileInit(STsdb *pTsdb, struct STFile *pFile) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbTFileClear(struct STFile *pFile) { return 0; }
|
int32_t tsdbTFileClear(STFile *pFile) {
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -16,8 +16,8 @@
|
||||||
#include "dev.h"
|
#include "dev.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
SFileSet *pSet;
|
STFileSet *pSet;
|
||||||
|
|
||||||
SBlockData bData;
|
SBlockData bData;
|
||||||
|
|
||||||
|
@ -39,13 +39,13 @@ static int32_t tsdbFileSystemShouldMerge(STsdb *pTsdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbFileSetShouldMerge(struct SFileSet *pSet) {
|
static int32_t tsdbFileSetShouldMerge(struct STFileSet *pSet) {
|
||||||
ASSERTS(0, "TODO: not implemented yet");
|
ASSERTS(0, "TODO: not implemented yet");
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbFileSetMerge(struct SFileSet *pFileSet) {
|
static int32_t tsdbFileSetMerge(struct STFileSet *pFileSet) {
|
||||||
ASSERTS(0, "TODO: not implemented yet");
|
ASSERTS(0, "TODO: not implemented yet");
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -94,8 +94,8 @@ int32_t tsdbMerge(STsdb *pTsdb) {
|
||||||
code = tsdbOpenMerger(pTsdb, &pMerger);
|
code = tsdbOpenMerger(pTsdb, &pMerger);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit)
|
TSDB_CHECK_CODE(code, lino, _exit)
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->pFS->aFileSet); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->pFS->cstate); i++) {
|
||||||
struct SFileSet *pFileSet = taosArrayGet(pTsdb->pFS->aFileSet, i);
|
struct STFileSet *pFileSet = taosArrayGet(pTsdb->pFS->cstate, i);
|
||||||
if (!tsdbFileSetShouldMerge(pFileSet)) {
|
if (!tsdbFileSetShouldMerge(pFileSet)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -193,7 +193,7 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index >= pArray->size) {
|
if (index >= pArray->size) {
|
||||||
uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity);
|
uError("index is out of range, current:%" PRIzu " max:%d", index, pArray->capacity);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue