diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h index 0f8be95d4f..d15cf387ac 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbDataFileRW.h @@ -14,6 +14,7 @@ */ #include "tsdbDef.h" +#include "tsdbFSet.h" #ifndef _TSDB_DATA_FILE_RW_H #define _TSDB_DATA_FILE_RW_H @@ -26,24 +27,19 @@ extern "C" { typedef struct SDataFileReader SDataFileReader; typedef struct SDataFileReaderConfig SDataFileReaderConfig; +int32_t tsdbDataFileReaderOpen(const char *fname, const SDataFileReaderConfig *config, SDataFileReader **reader); +int32_t tsdbDataFileReaderClose(SDataFileReader *reader); + // SDataFileWriter ============================================= -typedef struct SDataFileWriter SDataFileWriter; -typedef struct SDataFileWriterConfig SDataFileWriterConfig; +typedef struct SDataFileWriter SDataFileWriter; +typedef struct SDataFileWriterConfig { + STsdb *tsdb; +} SDataFileWriterConfig; int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer); -int32_t tsdbDataFileWriterClose(SDataFileWriter *writer); +int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]); int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData); -int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *writer, SBlockData *bData); - -struct SDataFileReaderConfig { - STsdb *pTsdb; - // TODO -}; - -struct SDataFileWriterConfig { - STsdb *pTsdb; - // TODO -}; +// int32_t tsdbDataFileWriteDelData(SDataFileWriter *writer, SBlockData *bData); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index e3c24f7716..ae68cc3efb 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -34,7 +34,7 @@ typedef struct SSttSegReader SSttSegReader; typedef TARRAY2(SSttSegReader *) TSttSegReaderArray; // SSttFileReader -int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader); +int32_t tsdbSttFReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader); int32_t tsdbSttFReaderClose(SSttFileReader **reader); int32_t tsdbSttFReaderGetSegReader(SSttFileReader *reader, const TSttSegReaderArray **readerArray); diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h index b814af89fb..c591960a8e 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h @@ -24,8 +24,9 @@ extern "C" { // SDelBlock ---------- +#define DEL_RECORD_NUM_ELEM 5 typedef union { - int64_t aData[5]; + int64_t aData[DEL_RECORD_NUM_ELEM]; struct { int64_t suid; int64_t uid; @@ -36,7 +37,7 @@ typedef union { } SDelRecord; typedef union { - TARRAY2(int64_t) aData[5]; + TARRAY2(int64_t) aData[DEL_RECORD_NUM_ELEM]; struct { TARRAY2(int64_t) suid[1]; TARRAY2(int64_t) uid[1]; @@ -48,11 +49,12 @@ typedef union { typedef struct SDelBlk { int32_t nRow; + int32_t size[DEL_RECORD_NUM_ELEM]; TABLEID minTid; TABLEID maxTid; int64_t minVer; int64_t maxVer; - SFDataPtr dp; + SFDataPtr dp[1]; } SDelBlk; #define DEL_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) @@ -65,8 +67,9 @@ int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size); int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock); // STbStatisBlock ---------- +#define STATIS_RECORD_NUM_ELEM 9 typedef union { - int64_t aData[9]; + int64_t aData[STATIS_RECORD_NUM_ELEM]; struct { int64_t suid; int64_t uid; @@ -81,7 +84,7 @@ typedef union { } STbStatisRecord; typedef union { - TARRAY2(int64_t) aData[9]; + TARRAY2(int64_t) aData[STATIS_RECORD_NUM_ELEM]; struct { TARRAY2(int64_t) suid[1]; TARRAY2(int64_t) uid[1]; @@ -97,7 +100,7 @@ typedef union { typedef struct STbStatisBlk { int32_t numRec; - int32_t size[9]; + int32_t size[STATIS_RECORD_NUM_ELEM]; TABLEID minTid; TABLEID maxTid; int64_t minVer; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index ac5ee17d8c..a211b2504e 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -272,8 +272,10 @@ static int32_t commit_fset_end(SCommitter *pCommitter) { code = tsdbSttFWriterClose(&pCommitter->pWriter, 0, &op); TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_APPEND(&pCommitter->opArray, op); - TSDB_CHECK_CODE(code, lino, _exit); + if (op.optype != TSDB_FOP_NONE) { + code = TARRAY2_APPEND(&pCommitter->opArray, op); + TSDB_CHECK_CODE(code, lino, _exit); + } _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c index cd6cb94d2e..f956bf892b 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbDataFileRW.c @@ -22,53 +22,84 @@ struct SDataFileReader { // SDataFileWriter ============================================= struct SDataFileWriter { + SDataFileWriterConfig config[1]; + struct { + bool opened; + } ctx[1]; // TODO }; -int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **ppWriter) { +int32_t tsdbDataFileWriterOpen(const SDataFileWriterConfig *config, SDataFileWriter **writer) { + writer[0] = taosMemoryCalloc(1, sizeof(SDataFileWriter)); + if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; + writer[0]->ctx->opened = false; + return 0; +} + +static int32_t tsdbDataFileWriterCloseCommit(SDataFileWriter *writer) { + // TODO + return 0; +} +static int32_t tsdbDataFileWriterCloseAbort(SDataFileWriter *writer) { + // TODO + return 0; +} +static int32_t tsdbDataFileWriterDoClose(SDataFileWriter *writer) { + // TODO + return 0; +} +static int32_t tsdbDataFileWriterCloseImpl(SDataFileWriter *writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]) { int32_t code = 0; int32_t lino = 0; - int32_t vid = TD_VID(config->pTsdb->pVnode); -// TODO + int32_t vid = TD_VID(writer->config->tsdb->pVnode); + + if (!writer->ctx->opened) { + for (int32_t i = 0; i < TSDB_FTYPE_MAX; ++i) op[i].optype = TSDB_FOP_NONE; + } else { + if (abort) { + code = tsdbDataFileWriterCloseAbort(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileWriterCloseCommit(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } + tsdbDataFileWriterDoClose(writer); + } + taosMemoryFree(writer); + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } return code; } +int32_t tsdbDataFileWriterClose(SDataFileWriter **writer, bool abort, STFileOp op[/*TSDB_FTYPE_MAX*/]) { + int32_t code = tsdbDataFileWriterCloseImpl(writer[0], abort, op); + if (code) { + return code; + } else { + writer[0] = NULL; + return 0; + } +} -int32_t tsdbDataFileWriterClose(SDataFileWriter *pWriter) { +static int32_t tsdbDataFileWriterDoOpen(SDataFileWriter *writer) { + // TODO + return 0; +} +int32_t tsdbDataFileWriteTSData(SDataFileWriter *writer, SBlockData *bData) { int32_t code = 0; int32_t lino = 0; - int32_t vid = 0; // TODO: TD_VID(config->pTsdb->pVnode); -// TODO + int32_t vid = TD_VID(writer->config->tsdb->pVnode); + + if (!writer->ctx->opened) { + code = tsdbDataFileWriterDoOpen(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); } return code; } - -int32_t tsdbDataFileWriteTSData(SDataFileWriter *pWriter, SBlockData *pBlockData) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = 0; // TODO: TD_VID(config->pTsdb->pVnode); -// TODO -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tsdbDataFileWriteTSDataBlock(SDataFileWriter *pWriter, SBlockData *pBlockData) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = 0; // TODO: TD_VID(config->pTsdb->pVnode); -// TODO -_exit: - if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); - } - return code; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index d21823bd92..081cef46b1 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -202,7 +202,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { .tsdb = merger->tsdb, // TODO }; - code = tsdbSttFReaderOpen(&config, &reader); + code = tsdbSttFReaderOpen(fobj->fname, &config, &reader); TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND(&merger->sttReaderArr, reader); @@ -259,7 +259,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { // open data file writer if (merger->ctx.toData) { SDataFileWriterConfig config = { - .pTsdb = merger->tsdb, + .tsdb = merger->tsdb, // TODO }; code = tsdbDataFileWriterOpen(&config, &merger->dataWriter); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 8d341f2823..3e538ded62 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -84,7 +84,7 @@ static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) { return 0; } -int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader **reader) { +int32_t tsdbSttFReaderOpen(const char *fname, const SSttFileReaderConfig *config, SSttFileReader **reader) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(config->tsdb->pVnode); @@ -95,8 +95,6 @@ int32_t tsdbSttFReaderOpen(const SSttFileReaderConfig *config, SSttFileReader ** reader[0]->config[0] = config[0]; // open file - char fname[TSDB_FILENAME_LEN]; - tsdbTFileName(config->tsdb, config->file, fname); code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd); TSDB_CHECK_CODE(code, lino, _exit); @@ -221,13 +219,60 @@ int32_t tsdbSttFReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBloc int32_t tsdbSttFReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData) { int32_t code = 0; - // TODO + int32_t lino = 0; + int32_t vid = TD_VID(reader->reader->config->tsdb->pVnode); + + tDelBlockClear(dData); + code = tRealloc(&reader->reader->config->aBuf[0], delBlk->dp->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->reader->fd, delBlk->dp->offset, reader->reader->config->aBuf[0], delBlk->dp->size); + if (code) TSDB_CHECK_CODE(code, lino, _exit); + + int64_t size = 0; + for (int32_t i = 0; i < ARRAY_SIZE(dData->aData); ++i) { + code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, delBlk->size[i], TSDB_DATA_TYPE_BIGINT, + TWO_STAGE_COMP, NULL, 0, NULL); // TODO + TSDB_CHECK_CODE(code, lino, _exit); + + size += delBlk->size[i]; + } + + ASSERT(size == delBlk->dp->size); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d, reason:%s", vid, __func__, lino, tstrerror(code)); + } return code; } int32_t tsdbSttFReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData) { int32_t code = 0; - // TODO + int32_t lino = 0; + int32_t vid = TD_VID(reader->reader->config->tsdb->pVnode); + + tStatisBlockClear(sData); + code = tRealloc(&reader->reader->config->aBuf[0], statisBlk->dp->size); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbReadFile(reader->reader->fd, statisBlk->dp->offset, reader->reader->config->aBuf[0], statisBlk->dp->size); + if (code) TSDB_CHECK_CODE(code, lino, _exit); + + int64_t size = 0; + for (int32_t i = 0; i < ARRAY_SIZE(sData->aData); ++i) { + code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, + TWO_STAGE_COMP, NULL, 0, NULL); // TODO + TSDB_CHECK_CODE(code, lino, _exit); + + size += statisBlk->size[i]; + } + + ASSERT(size == statisBlk->dp->size); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d, reason:%s", vid, __func__, lino, tstrerror(code)); + } return code; }