diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 978fd9013a..1c4c24b09d 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -43,6 +43,7 @@ target_sources( "src/tsdb/tsdbRead.c" "src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbWrite.c" + "src/tsdb/tsdbReaderWriter.c" "src/tsdb/tsdbSnapshot.c" # tq diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 657b55a0c6..45d36cebd4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -124,6 +124,59 @@ int tsdbRLockFS(STsdbFS *pFs); int tsdbWLockFS(STsdbFS *pFs); int tsdbUnLockFS(STsdbFS *pFs); +// tsdbReadImpl.c ============================================================================================== +typedef struct SBlockIdx SBlockIdx; +typedef struct SBlockInfo SBlockInfo; +typedef struct SBlock SBlock; +typedef struct SBlockCol SBlockCol; +typedef struct SBlockStatis SBlockStatis; +typedef struct SAggrBlkCol SAggrBlkCol; +typedef struct SBlockData SBlockData; +typedef struct SReadH SReadH; + +// SReadH +int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); +void tsdbDestroyReadH(SReadH *pReadh); +int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); +void tsdbCloseAndUnsetFSet(SReadH *pReadh); +int tsdbLoadBlockIdx(SReadH *pReadh); +int tsdbSetReadTable(SReadH *pReadh, STable *pTable); +int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); +int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, + bool mergeBitmap); +int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); +int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); +void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); +void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock); + +typedef struct SDFileSetReader SDFileSetReader; +typedef struct SDFileSetWriter SDFileSetWriter; +typedef struct STombstoneFileWriter STombstoneFileWriter; +typedef struct STombstoneFileReader STombstoneFileReader; + +// SDFileSetWriter +int32_t tsdbDFileSetWriterOpen(SDFileSetWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet); +int32_t tsdbDFileSetWriterClose(SDFileSetWriter *pWriter); +int32_t tsdbWriteBlockData(SDFileSetWriter *pWriter, SDataCols *pDataCols, SBlock *pBlock); +int32_t tsdbWriteSBlockInfo(SDFileSetWriter *pWriter, SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx); +int32_t tsdbWriteSBlockIdx(SDFileSetWriter *pWriter, SBlockIdx *pBlockIdx); + +// SDFileSetReader +int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet); +int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader); +int32_t tsdbLoadSBlockIdx(SDFileSetReader *pReader, SArray *pArray); +int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo); +int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis); + +// STombstoneFileWriter +int32_t tsdbTomstoneFileWriterOpen(STombstoneFileWriter *pWriter, STsdb *pTsdb); +int32_t tsdbTomstoneFileWriterClose(STombstoneFileWriter *pWriter); + +// STombstoneFileReader +int32_t tsdbTomstoneFileReaderOpen(STombstoneFileReader *pReader, STsdb *pTsdb); +int32_t tsdbTomstoneFileReaderClose(STombstoneFileReader *pReader); + // structs typedef struct { int minFid; @@ -322,9 +375,8 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(STbDataIter *pIter) { } // tsdbReadImpl -typedef struct SReadH SReadH; -typedef struct { +struct SBlockIdx { uint64_t suid; uint64_t uid; uint32_t len; @@ -332,7 +384,7 @@ typedef struct { uint32_t hasLast : 2; uint32_t numOfBlocks : 30; TSDBKEY maxKey; -} SBlockIdx; +}; typedef enum { TSDB_SBLK_VER_0 = 0, @@ -341,7 +393,7 @@ typedef enum { #define SBlockVerLatest TSDB_SBLK_VER_0 -typedef struct { +struct SBlock { uint8_t last : 1; uint8_t hasDupKey : 1; // 0: no dup TS key, 1: has dup TS key(since supporting Multi-Version) uint8_t blkVer : 6; @@ -358,24 +410,24 @@ typedef struct { uint64_t aggrOffset : 63; TSDBKEY minKey; TSDBKEY maxKey; -} SBlock; +}; -typedef struct { +struct SBlockInfo { int32_t delimiter; // For recovery usage uint64_t suid; uint64_t uid; SBlock blocks[]; -} SBlockInfo; +}; -typedef struct { +struct SBlockCol { int16_t colId; uint16_t type : 6; uint16_t blen : 10; // 0 no bitmap if all rows are NORM, > 0 bitmap length uint32_t len; // data length + bitmap length uint32_t offset; -} SBlockCol; +}; -typedef struct { +struct SAggrBlkCol { int16_t colId; int16_t maxIndex; int16_t minIndex; @@ -383,14 +435,14 @@ typedef struct { int64_t sum; int64_t max; int64_t min; -} SAggrBlkCol; +}; -typedef struct { +struct SBlockData { int32_t delimiter; // For recovery usage int32_t numOfCols; // For recovery usage uint64_t uid; // For recovery usage SBlockCol cols[]; -} SBlockData; +}; typedef void SAggrBlkData; // SBlockCol cols[]; @@ -444,21 +496,6 @@ static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { } } -int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); -void tsdbDestroyReadH(SReadH *pReadh); -int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); -void tsdbCloseAndUnsetFSet(SReadH *pReadh); -int tsdbLoadBlockIdx(SReadH *pReadh); -int tsdbSetReadTable(SReadH *pReadh, STable *pTable); -int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); -int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); -int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, - bool mergeBitmap); -int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); -int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); -void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); -void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock); - static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) { void *pBuf = *ppBuf; size_t tsize = taosTSizeof(pBuf); diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c new file mode 100644 index 0000000000..d7f7835e2e --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" + +// SDFileSetWritter ==================================================== +struct SDFileSetWritter { + STsdb *pTsdb; + SDFileSet wSet; + int32_t szBuf1; + uint8_t *pBuf1; + int32_t szBuf2; + uint8_t *pBuf2; +}; + +// SDFileSetReader ==================================================== +struct SDFileSetReader { + STsdb *pTsdb; + SDFileSet rSet; + int32_t szBuf1; + uint8_t *pBuf1; + int32_t szBuf2; + uint8_t *pBuf2; +}; + +int32_t tsdbDFileSetReaderOpen(SDFileSetReader *pReader, STsdb *pTsdb, SDFileSet *pSet) { + int32_t code = 0; + + memset(pReader, 0, sizeof(*pReader)); + pReader->pTsdb = pTsdb; + pReader->rSet = *pSet; + + code = tsdbOpenDFileSet(&pReader->rSet, TD_FILE_READ); + if (code) { + goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d failed to open SDFileSetReader since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbDFileSetReaderClose(SDFileSetReader *pReader) { + int32_t code = 0; + + taosMemoryFreeClear(pReader->pBuf1); + taosMemoryFreeClear(pReader->pBuf2); + tsdbCloseDFileSet(&pReader->rSet); + + return code; +} + +int32_t tsdbLoadSBlockIdx(SDFileSetReader *pReader, SArray *pArray) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbLoadSBlockInfo(SDFileSetReader *pReader, SBlockIdx *pBlockIdx, SBlockInfo *pBlockInfo) { + int32_t code = 0; + // TODO + return code; +} + +int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockStatis *pBlockStatis) { + int32_t code = 0; + // TODO + return code; +} + +// STombstoneFileWriter ==================================================== +struct STombstoneFileWriter { + STsdb *pTsdb; + SDFile *pTombstoneF; +}; + +// STombstoneFileReader ==================================================== +struct STombstoneFileReader { + STsdb *pTsdb; + SDFile *pTombstoneF; +};