more code

This commit is contained in:
Hongze Cheng 2023-04-23 17:37:12 +08:00
parent 59e543e18d
commit 651ca6aca2
9 changed files with 91 additions and 76 deletions

View File

@ -22,13 +22,11 @@
extern "C" { extern "C" {
#endif #endif
/* Exposed Handle */ // SDataFileReader =============================================
// typedef struct SDataFReader SDataFReader; typedef struct SDataFileReader SDataFileReader;
// typedef struct SDataFWriter SDataFWriter;
/* Exposed APIs */ // SDataFileWriter =============================================
typedef struct SDataFileWriter SDataFileWriter;
/* Exposed Structs */
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -23,14 +23,7 @@ extern "C" {
#endif #endif
/* Exposed Handle */ /* Exposed Handle */
struct STFileSystem { typedef struct STFileSystem STFileSystem;
STsdb *pTsdb;
// int32_t state; // TODO
tsem_t canEdit;
int64_t nextEditId;
SArray *aFileSet; // SArray<struct SFileSet>
SArray *nState; // SArray<struct SFileSet>
};
typedef enum { typedef enum {
TSDB_FS_EDIT_NONE = 0, TSDB_FS_EDIT_NONE = 0,
@ -41,14 +34,23 @@ typedef enum {
/* Exposed APIs */ /* Exposed APIs */
// open/close // open/close
int32_t tsdbOpenFileSystem(STsdb *pTsdb, struct STFileSystem **ppFS, int8_t rollback); int32_t tsdbOpenFileSystem(STsdb *pTsdb, STFileSystem **ppFS, int8_t rollback);
int32_t tsdbCloseFileSystem(struct STFileSystem **ppFS); int32_t tsdbCloseFileSystem(STFileSystem **ppFS);
// txn // txn
int32_t tsdbFileSystemEditBegin(struct STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype); int32_t tsdbFileSystemEditBegin(STFileSystem *pFS, const SArray *aFileOp, tsdb_fs_edit_t etype);
int32_t tsdbFileSystemEditCommit(struct STFileSystem *pFS, tsdb_fs_edit_t etype); int32_t tsdbFileSystemEditCommit(STFileSystem *pFS, tsdb_fs_edit_t etype);
int32_t tsdbFileSystemEditAbort(struct STFileSystem *pFS, tsdb_fs_edit_t etype); int32_t tsdbFileSystemEditAbort(STFileSystem *pFS, tsdb_fs_edit_t etype);
/* Exposed Structs */ /* Exposed Structs */
struct STFileSystem {
STsdb *pTsdb;
int32_t state;
tsem_t canEdit;
int64_t nextEditId;
SArray *aFileSet; // SArray<struct SFileSet>
SArray *nState; // SArray<struct SFileSet>
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -34,6 +34,11 @@ typedef enum {
TSDB_FOP_TRUNCATE, TSDB_FOP_TRUNCATE,
} tsdb_fop_t; } tsdb_fop_t;
int32_t tsdbFileSetCreate(int32_t fid, SFileSet **ppSet);
int32_t tsdbFileSetEdit(SFileSet *pSet, SFileOp *pOp);
int32_t tsdbFileSetToJson(SJson *pJson, const SFileSet *pSet);
int32_t tsdbEditFileSet(SFileSet *pFileSet, const SFileOp *pOp);
struct SFileOp { struct SFileOp {
tsdb_fop_t op; tsdb_fop_t op;
int32_t fid; int32_t fid;
@ -55,11 +60,6 @@ struct SFileSet {
SSttLvl lvl0; // level 0 of .stt SSttLvl lvl0; // level 0 of .stt
}; };
int32_t tsdbFileSetCreate(int32_t fid, struct SFileSet **ppSet);
int32_t tsdbFileSetEdit(struct SFileSet *pSet, struct SFileOp *pOp);
int32_t tsdbFileSetToJson(SJson *pJson, const struct SFileSet *pSet);
int32_t tsdbEditFileSet(struct SFileSet *pFileSet, const struct SFileOp *pOp);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -49,21 +49,21 @@ struct STFile {
int32_t fid; int32_t fid;
union { union {
struct { struct {
int32_t level; // level of .stt int32_t level;
int32_t nSeg; // number of segments in .stt int32_t nSeg;
} stt; } stt; // .stt
struct {
int64_t offset;
} head; // .head
struct { struct {
// TODO // TODO
} head; } data; // .data
struct { struct {
// TODO // TODO
} data; } sma; // .sma
struct { struct {
// TODO // TODO
} sma; } tomb; // .tomb
struct {
// TODO
} tomb;
}; };
LISTD(STFile) listNode; LISTD(STFile) listNode;

View File

@ -23,24 +23,24 @@ extern "C" {
#endif #endif
// SSttFReader ========================================== // SSttFReader ==========================================
typedef struct SSttFSegReader SSttFSegReader; typedef struct SSttFSegReader SSttFSegReader;
typedef struct SSttFReader SSttFReader; typedef struct SSttFileReader SSttFileReader;
typedef struct SSttFReaderConfig SSttFReaderConfig; typedef struct SSttFileReaderConfig SSttFileReaderConfig;
int32_t tsdbSttFReaderOpen(const SSttFReaderConfig *config, SSttFReader **ppReader); int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader);
int32_t tsdbSttFReaderClose(SSttFReader **ppReader); int32_t tsdbSttFReaderClose(SSttFileReader **ppReader);
// SSttFWriter ========================================== // SSttFWriter ==========================================
typedef struct SSttFWriter SSttFWriter; typedef struct SSttFileWriter SSttFileWriter;
typedef struct SSttFWriterConfig SSttFWriterConfig; typedef struct SSttFileWriterConfig SSttFileWriterConfig;
int32_t tsdbSttFWriterOpen(const SSttFWriterConfig *config, SSttFWriter **ppWriter); int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter **ppWriter);
int32_t tsdbSttFWriterClose(SSttFWriter **ppWriter, int8_t abort, struct SFileOp *op); int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct SFileOp *op);
int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow); int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow);
int32_t tsdbSttFWriteDLData(SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData); int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData);
/* ------------------------------------------------- */ /* ------------------------------------------------- */
struct SSttFWriterConfig { struct SSttFileWriterConfig {
STsdb *pTsdb; STsdb *pTsdb;
STFile file; STFile file;
int32_t maxRow; int32_t maxRow;
@ -51,7 +51,7 @@ struct SSttFWriterConfig {
uint8_t **aBuf; uint8_t **aBuf;
}; };
struct SSttFReaderConfig { struct SSttFileReaderConfig {
STsdb *pTsdb; STsdb *pTsdb;
SSkmInfo *pSkmTb; SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow; SSkmInfo *pSkmRow;

View File

@ -27,15 +27,15 @@ typedef struct {
int8_t sttTrigger; int8_t sttTrigger;
SArray *aTbDataP; SArray *aTbDataP;
// context // context
TSKEY nextKey; TSKEY nextKey;
int32_t fid; int32_t fid;
int32_t expLevel; int32_t expLevel;
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
struct SFileSet *pFileSet; SFileSet *pFileSet;
// writer // writer
SArray *aFileOp; SArray *aFileOp;
struct SSttFWriter *pWriter; SSttFileWriter *pWriter;
} SCommitter; } SCommitter;
static int32_t open_committer_writer(SCommitter *pCommitter) { static int32_t open_committer_writer(SCommitter *pCommitter) {
@ -44,7 +44,7 @@ static int32_t open_committer_writer(SCommitter *pCommitter) {
STsdb *pTsdb = pCommitter->pTsdb; STsdb *pTsdb = pCommitter->pTsdb;
struct SSttFWriterConfig conf = { SSttFileWriterConfig conf = {
.pTsdb = pCommitter->pTsdb, .pTsdb = pCommitter->pTsdb,
.maxRow = pCommitter->maxRow, .maxRow = pCommitter->maxRow,
.szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize, .szPage = pCommitter->pTsdb->pVnode->config.tsdbPageSize,

View File

@ -16,11 +16,11 @@
#include "dev.h" #include "dev.h"
const char *tsdb_ftype_suffix[] = { const char *tsdb_ftype_suffix[] = {
NULL, // TSDB_FTYPE_NONE
".head", // TSDB_FTYPE_HEAD ".head", // TSDB_FTYPE_HEAD
".data", // TSDB_FTYPE_DATA ".data", // TSDB_FTYPE_DATA
".sma", // TSDB_FTYPE_SMA ".sma", // TSDB_FTYPE_SMA
".tomb", // TSDB_FTYPE_TOMB ".tomb", // TSDB_FTYPE_TOMB
NULL, // TSDB_FTYPE_MAX
".stt", // TSDB_FTYPE_STT ".stt", // TSDB_FTYPE_STT
}; };

View File

@ -18,6 +18,9 @@
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
SSttFileReader *pSttFReader;
SSttFileWriter *pSttFWriter;
SArray *aFileOp; // SArray<struct SFileOp> SArray *aFileOp; // SArray<struct SFileOp>
} SMerger; } SMerger;

View File

@ -22,14 +22,26 @@ typedef struct {
} SFSttFooter; } SFSttFooter;
// SSttFReader ============================================================ // SSttFReader ============================================================
struct SSttFReader { struct SSttFileReader {
SSttFReaderConfig *config; SSttFileReaderConfig *config;
// TODO // TODO
}; };
int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **ppReader) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbSttFReaderClose(SSttFileReader **ppReader) {
int32_t code = 0;
// TODO
return code;
}
// SSttFWriter ============================================================ // SSttFWriter ============================================================
struct SSttFWriter { struct SSttFileWriter {
SSttFWriterConfig config; SSttFileWriterConfig config;
// file // file
STFile tFile; STFile tFile;
// data // data
@ -49,7 +61,7 @@ struct SSttFWriter {
STsdbFD *pFd; STsdbFD *pFd;
}; };
static int32_t write_timeseries_block(struct SSttFWriter *pWriter) { static int32_t write_timeseries_block(SSttFileWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -119,7 +131,7 @@ _exit:
return code; return code;
} }
static int32_t write_statistics_block(struct SSttFWriter *pWriter) { static int32_t write_statistics_block(SSttFileWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -174,7 +186,7 @@ _exit:
return code; return code;
} }
static int32_t write_delete_block(struct SSttFWriter *pWriter) { static int32_t write_delete_block(SSttFileWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -221,7 +233,7 @@ _exit:
return code; return code;
} }
static int32_t write_stt_blk(struct SSttFWriter *pWriter) { static int32_t write_stt_blk(SSttFileWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -253,7 +265,7 @@ _exit:
return code; return code;
} }
static int32_t write_statistics_blk(struct SSttFWriter *pWriter) { static int32_t write_statistics_blk(SSttFileWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -285,7 +297,7 @@ _exit:
return code; return code;
} }
static int32_t write_del_blk(struct SSttFWriter *pWriter) { static int32_t write_del_blk(SSttFileWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -317,7 +329,7 @@ _exit:
return code; return code;
} }
static int32_t write_file_footer(struct SSttFWriter *pWriter) { static int32_t write_file_footer(SSttFileWriter *pWriter) {
int32_t code = tsdbWriteFile( // int32_t code = tsdbWriteFile( //
pWriter->pFd, // pWriter->pFd, //
pWriter->tFile.size, // pWriter->tFile.size, //
@ -327,12 +339,12 @@ static int32_t write_file_footer(struct SSttFWriter *pWriter) {
return code; return code;
} }
static int32_t write_file_header(struct SSttFWriter *pWriter) { static int32_t write_file_header(SSttFileWriter *pWriter) {
// TODO // TODO
return 0; return 0;
} }
static int32_t create_stt_fwriter(const SSttFWriterConfig *pConf, struct SSttFWriter **ppWriter) { static int32_t create_stt_fwriter(const SSttFileWriterConfig *pConf, SSttFileWriter **ppWriter) {
int32_t code = 0; int32_t code = 0;
// alloc // alloc
@ -380,7 +392,7 @@ _exit:
return code; return code;
} }
static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) { static int32_t destroy_stt_fwriter(SSttFileWriter *pWriter) {
if (pWriter) { if (pWriter) {
for (int32_t i = 0; i < ARRAY_SIZE(pWriter->aBuf); i++) { for (int32_t i = 0; i < ARRAY_SIZE(pWriter->aBuf); i++) {
tFree(pWriter->aBuf[i]); tFree(pWriter->aBuf[i]);
@ -400,7 +412,7 @@ static int32_t destroy_stt_fwriter(struct SSttFWriter *pWriter) {
return 0; return 0;
} }
static int32_t open_stt_fwriter(struct SSttFWriter *pWriter) { static int32_t open_stt_fwriter(SSttFileWriter *pWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
uint8_t hdr[TSDB_FHDR_SIZE] = {0}; uint8_t hdr[TSDB_FHDR_SIZE] = {0};
@ -451,12 +463,12 @@ _exit:
return code; return code;
} }
static int32_t close_stt_fwriter(struct SSttFWriter *pWriter) { static int32_t close_stt_fwriter(SSttFileWriter *pWriter) {
tsdbCloseFile(&pWriter->pFd); tsdbCloseFile(&pWriter->pFd);
return 0; return 0;
} }
int32_t tsdbSttFWriterOpen(const SSttFWriterConfig *pConf, struct SSttFWriter **ppWriter) { int32_t tsdbSttFWriterOpen(const SSttFileWriterConfig *pConf, SSttFileWriter **ppWriter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -481,7 +493,7 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFWriterClose(struct SSttFWriter **ppWriter, int8_t abort, struct SFileOp *op) { int32_t tsdbSttFWriterClose(SSttFileWriter **ppWriter, int8_t abort, struct SFileOp *op) {
int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode); int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode);
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -571,7 +583,7 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFWriteTSData(struct SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) { int32_t tsdbSttFWriteTSData(SSttFileWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
@ -674,7 +686,7 @@ _exit:
return code; return code;
} }
int32_t tsdbSttFWriteDLData(struct SSttFWriter *pWriter, TABLEID *tbid, SDelData *pDelData) { int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pDelData) {
ASSERTS(0, "TODO: Not implemented yet"); ASSERTS(0, "TODO: Not implemented yet");
pWriter->dData.aData[0][pWriter->dData.nRow] = tbid->suid; // suid pWriter->dData.aData[0][pWriter->dData.nRow] = tbid->suid; // suid