more code

This commit is contained in:
Hongze Cheng 2023-03-24 17:57:37 +08:00
parent 873c9705fe
commit 052d9c6e4b
9 changed files with 188 additions and 81 deletions

View File

@ -13,4 +13,25 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TSDB_DEV_H
#define _TSDB_DEV_H
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif
#include "tsdbUtil.h"
#include "tsdbFile.h"
#include "tsdbFS.h"
#include "tsdbSttFWriter.h"
#ifdef __cplusplus
}
#endif
#endif /*_TSDB_DEV_H*/

View File

@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbSttFWriter.h"
#include "dev.h"
// extern dependencies
typedef struct {

View File

@ -11,4 +11,6 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "dev.h"

View File

@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbFile.h"
#include "dev.h"
typedef enum {
TSDB_FOP_CREATE = -2, // create a file
@ -24,59 +24,10 @@ typedef enum {
TSDB_FOP_MAX,
} tsdb_fop_t;
typedef enum {
TSDB_FTYPE_NONE = 0, // no file type
TSDB_FTYPE_STT, // .stt
TSDB_FTYPE_HEAD, // .head
TSDB_FTYPE_DATA, // .data
TSDB_FTYPE_SMA, // .sma
TSDB_FTYPE_TOMB, // .tomb
} tsdb_ftype_t;
const char *tsdb_ftype_suffix[] = {
"none", "stt", "head", "data", "sma", "tomb",
};
typedef struct SFStt {
int64_t offset;
} SFStt;
typedef struct SFHead {
int64_t offset;
} SFHead;
typedef struct SFData {
// TODO
} SFData;
typedef struct SFSma {
// TODO
} SFSma;
typedef struct SFTomb {
// TODO
} SFTomb;
struct STFile {
SDiskID diskId;
int64_t size;
int64_t cid;
int32_t fid;
tsdb_ftype_t type;
union {
SFStt fstt;
SFHead fhead;
SFData fdata;
SFSma fsma;
SFTomb ftomb;
};
};
struct SFileObj {
volatile int32_t nRef;
STFile file;
};
struct SFileOp {
tsdb_fop_t op;
union {

View File

@ -23,13 +23,56 @@ extern "C" {
#endif
/* Exposed Handle */
typedef struct STFile STFile;
typedef struct SFileObj SFileObj;
typedef struct SFileOp SFileOp;
typedef struct STFile STFile;
typedef struct SFileOp SFileOp;
typedef enum {
TSDB_FTYPE_NONE = 0, // no file type
TSDB_FTYPE_STT, // .stt
TSDB_FTYPE_HEAD, // .head
TSDB_FTYPE_DATA, // .data
TSDB_FTYPE_SMA, // .sma
TSDB_FTYPE_TOMB, // .tomb
} tsdb_ftype_t;
/* Exposed APIs */
/* Exposed Structs */
typedef struct SFStt {
int64_t offset;
} SFStt;
typedef struct SFHead {
int64_t offset;
} SFHead;
typedef struct SFData {
// TODO
} SFData;
typedef struct SFSma {
// TODO
} SFSma;
typedef struct SFTomb {
// TODO
} SFTomb;
struct STFile {
char fname[TSDB_FILENAME_LEN];
SDiskID diskId;
int64_t size;
int64_t cid;
int32_t fid;
int32_t ref;
tsdb_ftype_t type;
union {
SFStt fstt;
SFHead fhead;
SFData fdata;
SFSma fsma;
SFTomb ftomb;
};
};
#ifdef __cplusplus
}

View File

@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbSttFWriter.h"
#include "tsdbUtil.h"
#include "dev.h"
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
extern void tsdbCloseFile(STsdbFD **ppFD);
@ -26,7 +25,6 @@ struct SSttFWriter {
SSttFWriterConf config;
// time-series data
SBlockData bData;
SSttBlk sttBlk;
SArray *aSttBlk; // SArray<SSttBlk>
// tombstone data
SDelBlock dData;
@ -37,18 +35,34 @@ struct SSttFWriter {
STsdbFD *pFd;
};
static int32_t tsdbSttFWriteTSBlock(SSttFWriter *pWriter) {
static int32_t write_ts_block(SSttFWriter *pWriter) {
int32_t code = 0;
int32_t lino;
SBlockData *pBData = &pWriter->bData;
SSttBlk *pSttBlk = (SSttBlk *)taosArrayReserve(pWriter->aSttBlk, 1);
if (pSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pSttBlk->suid = pBData->suid;
pSttBlk->minUid = pBData->aUid[0];
pSttBlk->maxUid = pBData->aUid[pBData->nRow - 1];
pSttBlk->minKey = pSttBlk->maxKey = pBData->aTSKEY[0];
pSttBlk->minVer = pSttBlk->maxVer = pBData->aTSKEY[0];
pSttBlk->nRow = pBData->nRow;
for (int32_t iRow = 1; iRow < pBData->nRow; iRow++) {
pSttBlk->minKey = TMIN(pSttBlk->minKey, pBData->aTSKEY[iRow]);
pSttBlk->maxKey = TMAX(pSttBlk->maxKey, pBData->aTSKEY[iRow]);
pSttBlk->minVer = TMIN(pSttBlk->minVer, pBData->aVersion[iRow]);
pSttBlk->maxVer = TMAX(pSttBlk->maxVer, pBData->aVersion[iRow]);
}
// compress data block
code = tCmprBlockData(pBData, pWriter->config.cmprAlg, NULL, NULL, NULL /* TODO */, NULL /* TODO */);
TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_NULL(taosArrayPush(pWriter->aSttBlk, &pWriter->sttBlk), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY);
tBlockDataClear(pBData);
_exit:
@ -59,24 +73,88 @@ _exit:
return code;
}
static int32_t write_del_block(SSttFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t write_stt_blk(SSttFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t write_del_blk(SSttFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t stt_fwriter_create(const SSttFWriterConf *pConf, SSttFWriter **ppWriter) {
int32_t code = 0;
if ((ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
ppWriter[0]->config = pConf[0];
if (pConf->pSkmRow == NULL) {
ppWriter[0]->config.pSkmRow = &ppWriter[0]->skmRow;
}
if (pConf->pSkmTb == NULL) {
ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb;
}
tBlockDataCreate(&ppWriter[0]->bData);
ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk));
if (ppWriter[0]->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
if (code && ppWriter[0]) {
taosArrayDestroy(ppWriter[0]->aSttBlk);
tBlockDataDestroy(&ppWriter[0]->bData);
taosMemoryFree(ppWriter[0]);
ppWriter[0] = NULL;
}
return code;
}
static int32_t stt_fwriter_destroy(SSttFWriter *pWriter) {
if (pWriter) {
tDestroyTSchema(pWriter->skmTb.pTSchema);
tDestroyTSchema(pWriter->skmRow.pTSchema);
taosArrayDestroy(pWriter->aSttBlk);
tBlockDataDestroy(&pWriter->bData);
taosMemoryFree(pWriter);
}
return 0;
}
static int32_t stt_fwriter_open(SSttFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
static int32_t stt_fwriter_close(SSttFWriter *pWriter) {
int32_t code = 0;
// TODO
return code;
}
int32_t tsdbSttFWriterOpen(const SSttFWriterConf *pConf, SSttFWriter **ppWriter) {
int32_t code = 0;
int32_t lino;
if ((ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = stt_fwriter_create(pConf, ppWriter);
TSDB_CHECK_CODE(code, lino, _exit);
ppWriter[0]->config = pConf[0];
if (ppWriter[0]->config.pSkmTb == NULL) ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb;
if (ppWriter[0]->config.pSkmRow == NULL) ppWriter[0]->config.pSkmRow = &ppWriter[0]->skmRow;
tBlockDataCreate(&ppWriter[0]->bData);
// tDelBlockCreate(&ppWriter[0]->dData);
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO
code = tsdbOpenFile(NULL /* TODO */, pConf->szPage, flag, &ppWriter[0]->pFd);
code = stt_fwriter_open(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
@ -91,8 +169,19 @@ _exit:
}
int32_t tsdbSttFWriterClose(SSttFWriter **ppWriter) {
int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode);
int32_t code = 0;
// TODO
int32_t lino;
code = stt_fwriter_close(ppWriter[0]);
TSDB_CHECK_CODE(code, lino, _exit);
stt_fwriter_close(ppWriter[0]);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", vgId, __func__, lino, tstrerror(code));
}
return code;
}
@ -102,7 +191,7 @@ int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow)
if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) {
if (pWriter->bData.nRow > 0) {
code = tsdbSttFWriteTSBlock(pWriter);
code = write_ts_block(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -124,7 +213,7 @@ int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow)
TSDB_CHECK_CODE(code, lino, _exit);
if (pWriter->bData.nRow >= pWriter->config.maxRow) {
code = tsdbSttFWriteTSBlock(pWriter);
code = write_ts_block(pWriter);
TSDB_CHECK_CODE(code, lino, _exit);
}

View File

@ -16,7 +16,7 @@
#ifndef _TSDB_STT_FILE_WRITER_H
#define _TSDB_STT_FILE_WRITER_H
#include "tsdb.h"
#include "tsdbFile.h"
#ifdef __cplusplus
extern "C" {
@ -34,6 +34,7 @@ struct SSttFWriterConf {
STsdb *pTsdb;
SSkmInfo *pSkmTb;
SSkmInfo *pSkmRow;
STFile file;
int32_t maxRow;
int32_t szPage;
int8_t cmprAlg;

View File

@ -11,4 +11,6 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "dev.h"

View File

@ -16,8 +16,6 @@
#ifndef _TSDB_UTIL_H
#define _TSDB_UTIL_H
#include "tsdb.h"
#ifdef __cplusplus
extern "C" {
#endif