From 052d9c6e4b9a86053cb71abe2c2cb389443b3ae3 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 24 Mar 2023 17:57:37 +0800 Subject: [PATCH] more code --- .../vnode/src/tsdb/dev/{tsdbMerge.c => dev.h} | 21 +++ source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 2 +- source/dnode/vnode/src/tsdb/dev/tsdbFS.c | 4 +- source/dnode/vnode/src/tsdb/dev/tsdbFile.c | 51 +------ source/dnode/vnode/src/tsdb/dev/tsdbFile.h | 49 ++++++- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c | 133 +++++++++++++++--- .../dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h | 3 +- source/dnode/vnode/src/tsdb/dev/tsdbUtil.c | 4 +- source/dnode/vnode/src/tsdb/dev/tsdbUtil.h | 2 - 9 files changed, 188 insertions(+), 81 deletions(-) rename source/dnode/vnode/src/tsdb/dev/{tsdbMerge.c => dev.h} (73%) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/dev.h similarity index 73% rename from source/dnode/vnode/src/tsdb/dev/tsdbMerge.c rename to source/dnode/vnode/src/tsdb/dev/dev.h index fe0d3a1b6f..dec43b2a9a 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/dev.h @@ -13,4 +13,25 @@ * along with this program. If not, see . */ +#ifndef _TSDB_DEV_H +#define _TSDB_DEV_H + #include "tsdb.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tsdbUtil.h" + +#include "tsdbFile.h" + +#include "tsdbFS.h" + +#include "tsdbSttFWriter.h" + +#ifdef __cplusplus +} +#endif + +#endif /*_TSDB_DEV_H*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index f4eff3e9af..a5c319e2c4 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tsdbSttFWriter.h" +#include "dev.h" // extern dependencies typedef struct { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index 6dea4a4e57..c61a43d3ea 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "dev.h" \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFile.c b/source/dnode/vnode/src/tsdb/dev/tsdbFile.c index 078ac75a72..08d5841e87 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFile.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tsdbFile.h" +#include "dev.h" typedef enum { TSDB_FOP_CREATE = -2, // create a file @@ -24,59 +24,10 @@ typedef enum { TSDB_FOP_MAX, } tsdb_fop_t; -typedef enum { - TSDB_FTYPE_NONE = 0, // no file type - TSDB_FTYPE_STT, // .stt - TSDB_FTYPE_HEAD, // .head - TSDB_FTYPE_DATA, // .data - TSDB_FTYPE_SMA, // .sma - TSDB_FTYPE_TOMB, // .tomb -} tsdb_ftype_t; - const char *tsdb_ftype_suffix[] = { "none", "stt", "head", "data", "sma", "tomb", }; -typedef struct SFStt { - int64_t offset; -} SFStt; - -typedef struct SFHead { - int64_t offset; -} SFHead; - -typedef struct SFData { - // TODO -} SFData; - -typedef struct SFSma { - // TODO -} SFSma; - -typedef struct SFTomb { - // TODO -} SFTomb; - -struct STFile { - SDiskID diskId; - int64_t size; - int64_t cid; - int32_t fid; - tsdb_ftype_t type; - union { - SFStt fstt; - SFHead fhead; - SFData fdata; - SFSma fsma; - SFTomb ftomb; - }; -}; - -struct SFileObj { - volatile int32_t nRef; - STFile file; -}; - struct SFileOp { tsdb_fop_t op; union { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFile.h b/source/dnode/vnode/src/tsdb/dev/tsdbFile.h index c3000fb239..84152fb598 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFile.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFile.h @@ -23,13 +23,56 @@ extern "C" { #endif /* Exposed Handle */ -typedef struct STFile STFile; -typedef struct SFileObj SFileObj; -typedef struct SFileOp SFileOp; +typedef struct STFile STFile; +typedef struct SFileOp SFileOp; + +typedef enum { + TSDB_FTYPE_NONE = 0, // no file type + TSDB_FTYPE_STT, // .stt + TSDB_FTYPE_HEAD, // .head + TSDB_FTYPE_DATA, // .data + TSDB_FTYPE_SMA, // .sma + TSDB_FTYPE_TOMB, // .tomb +} tsdb_ftype_t; /* Exposed APIs */ /* Exposed Structs */ +typedef struct SFStt { + int64_t offset; +} SFStt; + +typedef struct SFHead { + int64_t offset; +} SFHead; + +typedef struct SFData { + // TODO +} SFData; + +typedef struct SFSma { + // TODO +} SFSma; + +typedef struct SFTomb { + // TODO +} SFTomb; +struct STFile { + char fname[TSDB_FILENAME_LEN]; + SDiskID diskId; + int64_t size; + int64_t cid; + int32_t fid; + int32_t ref; + tsdb_ftype_t type; + union { + SFStt fstt; + SFHead fhead; + SFData fdata; + SFSma fsma; + SFTomb ftomb; + }; +}; #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c index 68647a8333..43cb39efb0 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "tsdbSttFWriter.h" -#include "tsdbUtil.h" +#include "dev.h" extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD); extern void tsdbCloseFile(STsdbFD **ppFD); @@ -26,7 +25,6 @@ struct SSttFWriter { SSttFWriterConf config; // time-series data SBlockData bData; - SSttBlk sttBlk; SArray *aSttBlk; // SArray // tombstone data SDelBlock dData; @@ -37,18 +35,34 @@ struct SSttFWriter { STsdbFD *pFd; }; -static int32_t tsdbSttFWriteTSBlock(SSttFWriter *pWriter) { +static int32_t write_ts_block(SSttFWriter *pWriter) { int32_t code = 0; int32_t lino; SBlockData *pBData = &pWriter->bData; + SSttBlk *pSttBlk = (SSttBlk *)taosArrayReserve(pWriter->aSttBlk, 1); + if (pSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + pSttBlk->suid = pBData->suid; + pSttBlk->minUid = pBData->aUid[0]; + pSttBlk->maxUid = pBData->aUid[pBData->nRow - 1]; + pSttBlk->minKey = pSttBlk->maxKey = pBData->aTSKEY[0]; + pSttBlk->minVer = pSttBlk->maxVer = pBData->aTSKEY[0]; + pSttBlk->nRow = pBData->nRow; + for (int32_t iRow = 1; iRow < pBData->nRow; iRow++) { + pSttBlk->minKey = TMIN(pSttBlk->minKey, pBData->aTSKEY[iRow]); + pSttBlk->maxKey = TMAX(pSttBlk->maxKey, pBData->aTSKEY[iRow]); + pSttBlk->minVer = TMIN(pSttBlk->minVer, pBData->aVersion[iRow]); + pSttBlk->maxVer = TMAX(pSttBlk->maxVer, pBData->aVersion[iRow]); + } // compress data block code = tCmprBlockData(pBData, pWriter->config.cmprAlg, NULL, NULL, NULL /* TODO */, NULL /* TODO */); TSDB_CHECK_CODE(code, lino, _exit); - TSDB_CHECK_NULL(taosArrayPush(pWriter->aSttBlk, &pWriter->sttBlk), code, lino, _exit, TSDB_CODE_OUT_OF_MEMORY); - tBlockDataClear(pBData); _exit: @@ -59,24 +73,88 @@ _exit: return code; } +static int32_t write_del_block(SSttFWriter *pWriter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t write_stt_blk(SSttFWriter *pWriter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t write_del_blk(SSttFWriter *pWriter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t stt_fwriter_create(const SSttFWriterConf *pConf, SSttFWriter **ppWriter) { + int32_t code = 0; + + if ((ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter))) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + ppWriter[0]->config = pConf[0]; + if (pConf->pSkmRow == NULL) { + ppWriter[0]->config.pSkmRow = &ppWriter[0]->skmRow; + } + if (pConf->pSkmTb == NULL) { + ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb; + } + + tBlockDataCreate(&ppWriter[0]->bData); + ppWriter[0]->aSttBlk = taosArrayInit(64, sizeof(SSttBlk)); + if (ppWriter[0]->aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + +_exit: + if (code && ppWriter[0]) { + taosArrayDestroy(ppWriter[0]->aSttBlk); + tBlockDataDestroy(&ppWriter[0]->bData); + taosMemoryFree(ppWriter[0]); + ppWriter[0] = NULL; + } + return code; +} + +static int32_t stt_fwriter_destroy(SSttFWriter *pWriter) { + if (pWriter) { + tDestroyTSchema(pWriter->skmTb.pTSchema); + tDestroyTSchema(pWriter->skmRow.pTSchema); + taosArrayDestroy(pWriter->aSttBlk); + tBlockDataDestroy(&pWriter->bData); + taosMemoryFree(pWriter); + } + return 0; +} + +static int32_t stt_fwriter_open(SSttFWriter *pWriter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t stt_fwriter_close(SSttFWriter *pWriter) { + int32_t code = 0; + // TODO + return code; +} + int32_t tsdbSttFWriterOpen(const SSttFWriterConf *pConf, SSttFWriter **ppWriter) { int32_t code = 0; int32_t lino; - if ((ppWriter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter))) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } + code = stt_fwriter_create(pConf, ppWriter); + TSDB_CHECK_CODE(code, lino, _exit); - ppWriter[0]->config = pConf[0]; - if (ppWriter[0]->config.pSkmTb == NULL) ppWriter[0]->config.pSkmTb = &ppWriter[0]->skmTb; - if (ppWriter[0]->config.pSkmRow == NULL) ppWriter[0]->config.pSkmRow = &ppWriter[0]->skmRow; - - tBlockDataCreate(&ppWriter[0]->bData); - // tDelBlockCreate(&ppWriter[0]->dData); - - int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO - code = tsdbOpenFile(NULL /* TODO */, pConf->szPage, flag, &ppWriter[0]->pFd); + code = stt_fwriter_open(ppWriter[0]); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -91,8 +169,19 @@ _exit: } int32_t tsdbSttFWriterClose(SSttFWriter **ppWriter) { + int32_t vgId = TD_VID(ppWriter[0]->config.pTsdb->pVnode); int32_t code = 0; - // TODO + int32_t lino; + + code = stt_fwriter_close(ppWriter[0]); + TSDB_CHECK_CODE(code, lino, _exit); + + stt_fwriter_close(ppWriter[0]); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vgId, __func__, lino, tstrerror(code)); + } return code; } @@ -102,7 +191,7 @@ int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) if (!TABLE_SAME_SCHEMA(pWriter->bData.suid, pWriter->bData.uid, tbid->suid, tbid->uid)) { if (pWriter->bData.nRow > 0) { - code = tsdbSttFWriteTSBlock(pWriter); + code = write_ts_block(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } @@ -124,7 +213,7 @@ int32_t tsdbSttFWriteTSData(SSttFWriter *pWriter, TABLEID *tbid, TSDBROW *pRow) TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->bData.nRow >= pWriter->config.maxRow) { - code = tsdbSttFWriteTSBlock(pWriter); + code = write_ts_block(pWriter); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h index 2e153da38a..3a1b28a560 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFWriter.h @@ -16,7 +16,7 @@ #ifndef _TSDB_STT_FILE_WRITER_H #define _TSDB_STT_FILE_WRITER_H -#include "tsdb.h" +#include "tsdbFile.h" #ifdef __cplusplus extern "C" { @@ -34,6 +34,7 @@ struct SSttFWriterConf { STsdb *pTsdb; SSkmInfo *pSkmTb; SSkmInfo *pSkmRow; + STFile file; int32_t maxRow; int32_t szPage; int8_t cmprAlg; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c index 6dea4a4e57..c61a43d3ea 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "dev.h" \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h index 07c3a5d7ed..35a85c1b06 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.h @@ -16,8 +16,6 @@ #ifndef _TSDB_UTIL_H #define _TSDB_UTIL_H -#include "tsdb.h" - #ifdef __cplusplus extern "C" { #endif