diff --git a/.gitignore b/.gitignore index 704b2e7415..08e3d57717 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ CMakeSettings.json cmake-build-debug/ cmake-build-release/ cscope.out +cscope.files .DS_Store debug/ release/ diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ece1e40585..a428a9ae6a 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -289,6 +289,9 @@ const char* syncStr(ESyncState state); int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg); +// util +int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size); + #ifdef __cplusplus } #endif diff --git a/include/util/tdef.h b/include/util/tdef.h index 1a440c7268..51b0b63da2 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -288,6 +288,7 @@ typedef enum ELogicConditionType { #define TSDB_CONN_ACTIVE_KEY_LEN 255 #define TSDB_DEFAULT_PKT_SIZE 65480 // same as RPC_MAX_UDP_SIZE +#define TSDB_SNAP_DATA_PAYLOAD_SIZE (1 * 1024 * 1024) #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE #define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 79f9caab33..95982abfbe 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -681,41 +681,54 @@ struct SDelFWriter { typedef struct STFileSet STFileSet; typedef TARRAY2(STFileSet *) TFileSetArray; -typedef struct STSnapRange STSnapRange; -typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges +// fset range +typedef struct STFileSetRange STFileSetRange; +typedef TARRAY2(STFileSetRange *) TFileSetRangeArray; // disjoint ranges -// util -int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); -void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); -SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges); - -// snap partition list -typedef TARRAY2(SVersionRange) SVerRangeList; -typedef struct STsdbSnapPartition STsdbSnapPartition; -typedef TARRAY2(STsdbSnapPartition *) STsdbSnapPartList; -// util -STsdbSnapPartList *tsdbSnapPartListCreate(); -void tsdbSnapPartListDestroy(STsdbSnapPartList **ppList); -int32_t tSerializeTsdbSnapPartList(void *buf, int32_t bufLen, STsdbSnapPartList *pList); -int32_t tDeserializeTsdbSnapPartList(void *buf, int32_t bufLen, STsdbSnapPartList *pList); -int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList *pList, TSnapRangeArray **ppRanges); +int32_t tsdbTFileSetRangeClear(STFileSetRange **fsr); +int32_t tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray **ppArr); +// fset partition enum { - TSDB_SNAP_RANGE_TYP_HEAD = 0, - TSDB_SNAP_RANGE_TYP_DATA, - TSDB_SNAP_RANGE_TYP_SMA, - TSDB_SNAP_RANGE_TYP_TOMB, - TSDB_SNAP_RANGE_TYP_STT, - TSDB_SNAP_RANGE_TYP_MAX, + TSDB_FSET_RANGE_TYP_HEAD = 0, + TSDB_FSET_RANGE_TYP_DATA, + TSDB_FSET_RANGE_TYP_SMA, + TSDB_FSET_RANGE_TYP_TOMB, + TSDB_FSET_RANGE_TYP_STT, + TSDB_FSET_RANGE_TYP_MAX, }; -struct STsdbSnapPartition { +typedef TARRAY2(SVersionRange) SVerRangeList; + +struct STsdbFSetPartition { int64_t fid; int8_t stat; - SVerRangeList verRanges[TSDB_SNAP_RANGE_TYP_MAX]; + SVerRangeList verRanges[TSDB_FSET_RANGE_TYP_MAX]; }; +typedef struct STsdbFSetPartition STsdbFSetPartition; +typedef TARRAY2(STsdbFSetPartition *) STsdbFSetPartList; + +STsdbFSetPartList *tsdbFSetPartListCreate(); +void tsdbFSetPartListDestroy(STsdbFSetPartList **ppList); +int32_t tSerializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList); +int32_t tDeserializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList); +int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList *pList, TFileSetRangeArray **ppRanges); + +// snap rep format +typedef enum ETsdbRepFmt { + TSDB_SNAP_REP_FMT_DEFAULT = 0, + TSDB_SNAP_REP_FMT_RAW, + TSDB_SNAP_REP_FMT_HYBRID, +} ETsdbRepFmt; + +typedef struct STsdbRepOpts { + ETsdbRepFmt format; +} STsdbRepOpts; + +int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); +int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); + // snap read struct STsdbReadSnap { SMemTable *pMem; @@ -1042,7 +1055,7 @@ typedef enum { // utils ETsdbFsState tsdbSnapGetFsState(SVnode *pVnode); -int32_t tsdbSnapGetDetails(SVnode *pVnode, SSnapshot *pSnap); +int32_t tsdbSnapPrepDescription(SVnode *pVnode, SSnapshot *pSnap); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7ed0b5103f..50a28357e5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -65,6 +65,8 @@ typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct STsdbSnapReader STsdbSnapReader; typedef struct STsdbSnapWriter STsdbSnapWriter; +typedef struct STsdbSnapRAWReader STsdbSnapRAWReader; +typedef struct STsdbSnapRAWWriter STsdbSnapRAWWriter; typedef struct STqSnapReader STqSnapReader; typedef struct STqSnapWriter STqSnapWriter; typedef struct STqOffsetReader STqOffsetReader; @@ -313,6 +315,15 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); +// STsdbSnapRAWReader ======================================== +int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader); +int32_t tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader); +int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData); +// STsdbSnapRAWWriter ======================================== +int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter); +int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* pWriter, SSnapDataHdr* pHdr); +int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* pWriter); +int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** ppWriter, int8_t rollback); // STqSnapshotReader == int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader); int32_t tqSnapReaderClose(STqSnapReader** ppReader); @@ -531,6 +542,7 @@ enum { SNAP_DATA_STREAM_STATE = 11, SNAP_DATA_STREAM_STATE_BACKEND = 12, SNAP_DATA_TQ_CHECKINFO = 13, + SNAP_DATA_RAW = 14, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c new file mode 100644 index 0000000000..3f448379c9 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbDataFileRAW.h" + +// SDataFileRAWReader ============================================= +int32_t tsdbDataFileRAWReaderOpen(const char *fname, const SDataFileRAWReaderConfig *config, + SDataFileRAWReader **reader) { + int32_t code = 0; + int32_t lino = 0; + + reader[0] = taosMemoryCalloc(1, sizeof(SDataFileRAWReader)); + if (reader[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + reader[0]->config[0] = config[0]; + + if (fname) { + if (fname) { + code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + char fname1[TSDB_FILENAME_LEN]; + tsdbTFileName(config->tsdb, &config->file, fname1); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileRAWReaderClose(SDataFileRAWReader **reader) { + if (reader[0] == NULL) return 0; + + if (reader[0]->fd) { + tsdbCloseFile(&reader[0]->fd); + } + + taosMemoryFree(reader[0]); + reader[0] = NULL; + return 0; +} + +int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *pBlock) { + int32_t code = 0; + int32_t lino = 0; + + pBlock->file.type = reader->config->file.type; + pBlock->file.fid = reader->config->file.fid; + pBlock->file.cid = reader->config->file.cid; + pBlock->file.size = reader->config->file.size; + pBlock->file.minVer = reader->config->file.minVer; + pBlock->file.maxVer = reader->config->file.maxVer; + pBlock->file.stt->level = reader->config->file.stt->level; + + code = tsdbReadFile(reader->fd, pBlock->offset, pBlock->data, pBlock->dataLength, 0); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + +// SDataFileRAWWriter ============================================= +int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **ppWriter) { + int32_t code = 0; + int32_t lino = 0; + + SDataFileRAWWriter *writer = taosMemoryCalloc(1, sizeof(SDataFileRAWWriter)); + if (!writer) return TSDB_CODE_OUT_OF_MEMORY; + + writer->config[0] = config[0]; + + code = tsdbDataFileRAWWriterDoOpen(writer); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + taosMemoryFree(writer); + writer = NULL; + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + ppWriter[0] = writer; + return code; +} + +static int32_t tsdbDataFileRAWWriterCloseAbort(SDataFileRAWWriter *writer) { + ASSERT(0); + return 0; +} + +static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return 0; } + +static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) { + int32_t code = 0; + int32_t lino = 0; + ASSERT(writer->ctx->offset == writer->file.size); + ASSERT(writer->config->fid == writer->file.fid); + + STFileOp op = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = writer->config->fid, + .nf = writer->file, + }; + code = TARRAY2_APPEND(opArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + if (writer->fd) { + code = tsdbFsyncFile(writer->fd); + TSDB_CHECK_CODE(code, lino, _exit); + tsdbCloseFile(&writer->fd); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileRAWWriterOpenDataFD(SDataFileRAWWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + char fname[TSDB_FILENAME_LEN]; + int32_t flag = TD_FILE_READ | TD_FILE_WRITE; + + if (writer->ctx->offset == 0) { + flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); + } + + tsdbTFileName(writer->config->tsdb, &writer->file, fname); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + writer->file = writer->config->file; + writer->ctx->offset = 0; + + code = tsdbDataFileRAWWriterOpenDataFD(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->opened = true; +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr) { + if (writer[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + if (writer[0]->ctx->opened) { + if (abort) { + code = tsdbDataFileRAWWriterCloseAbort(writer[0]); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileRAWWriterCloseCommit(writer[0], opArr); + TSDB_CHECK_CODE(code, lino, _exit); + } + tsdbDataFileRAWWriterDoClose(writer[0]); + } + taosMemoryFree(writer[0]); + writer[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *pDataBlock) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbWriteFile(writer->fd, writer->ctx->offset, (const uint8_t *)pDataBlock->data, pDataBlock->dataLength); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->offset += pDataBlock->dataLength; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h new file mode 100644 index 0000000000..d765671698 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tarray2.h" +#include "tsdbDef.h" +#include "tsdbFSet2.h" +#include "tsdbFile2.h" +#include "tsdbUtil2.h" + +#ifndef _TSDB_DATA_FILE_RAW_H +#define _TSDB_DATA_FILE_RAW_H + +#ifdef __cplusplus +extern "C" { +#endif + +// STsdbDataRAWBlockHeader ======================================= +typedef struct STsdbDataRAWBlockHeader { + struct { + int32_t type; + int64_t fid; + int64_t cid; + int64_t size; + int64_t minVer; + int64_t maxVer; + union { + struct { + int32_t level; + } stt[1]; + }; + } file; + + int64_t offset; + int64_t dataLength; + uint8_t data[0]; +} STsdbDataRAWBlockHeader; + +// SDataFileRAWReader ============================================= +typedef struct SDataFileRAWReaderConfig { + STsdb *tsdb; + int32_t szPage; + + STFile file; +} SDataFileRAWReaderConfig; + +typedef struct SDataFileRAWReader { + SDataFileRAWReaderConfig config[1]; + + struct { + bool opened; + int64_t offset; + } ctx[1]; + + STsdbFD *fd; +} SDataFileRAWReader; + +typedef TARRAY2(SDataFileRAWReader *) SDataFileRAWReaderArray; + +int32_t tsdbDataFileRAWReaderOpen(const char *fname, const SDataFileRAWReaderConfig *config, + SDataFileRAWReader **reader); +int32_t tsdbDataFileRAWReaderClose(SDataFileRAWReader **reader); + +int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *bHdr); + +// SDataFileRAWWriter ============================================= +typedef struct SDataFileRAWWriterConfig { + STsdb *tsdb; + int32_t szPage; + + SDiskID did; + int64_t fid; + int64_t cid; + int32_t level; + + STFile file; +} SDataFileRAWWriterConfig; + +typedef struct SDataFileRAWWriter { + SDataFileRAWWriterConfig config[1]; + + struct { + bool opened; + int64_t offset; + } ctx[1]; + + STFile file; + STsdbFD *fd; +} SDataFileRAWWriter; + +typedef struct SDataFileRAWWriter SDataFileRAWWriter; + +int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **writer); +int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr); + +int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer); +int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *bHdr); +int32_t tsdbDataFileRAWFlush(SDataFileRAWWriter *writer); + +#ifdef __cplusplus +} +#endif + +#endif /*_TSDB_DATA_FILE_RAW_H*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 635c53bbed..e933b3a7dc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -1072,7 +1072,25 @@ int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr) { return 0; } -int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRanges, TFileSetArray **fsetArr, +static SHashObj *tsdbFSetRangeArrayToHash(TFileSetRangeArray *pRanges) { + int32_t capacity = TARRAY2_SIZE(pRanges) * 2; + SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + if (pHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) { + STFileSetRange *u = TARRAY2_GET(pRanges, i); + int32_t fid = u->fid; + int32_t code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u)); + ASSERT(code == 0); + tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever); + } + return pHash; +} + +int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRanges, TFileSetArray **fsetArr, TFileOpArray *fopArr) { int32_t code = 0; STFileSet *fset; @@ -1084,7 +1102,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange TARRAY2_INIT(fsetArr[0]); if (pRanges) { - pHash = tsdbGetSnapRangeHash(pRanges); + pHash = tsdbFSetRangeArrayToHash(pRanges); if (pHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _out; @@ -1096,7 +1114,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange int64_t ever = VERSION_MAX; if (pHash) { int32_t fid = fset->fid; - STSnapRange *u = taosHashGet(pHash, &fid, sizeof(fid)); + STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid)); if (u) { ever = u->sver - 1; } @@ -1123,29 +1141,13 @@ _out: return code; } -SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges) { - int32_t capacity = TARRAY2_SIZE(pRanges) * 2; - SHashObj *pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); - if (pHash == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } +int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { return tsdbFSDestroyCopySnapshot(fsetArr); } - for (int32_t i = 0; i < TARRAY2_SIZE(pRanges); i++) { - STSnapRange *u = TARRAY2_GET(pRanges, i); - int32_t fid = u->fid; - int32_t code = taosHashPut(pHash, &fid, sizeof(fid), u, sizeof(*u)); - ASSERT(code == 0); - tsdbDebug("range diff hash fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever); - } - return pHash; -} - -int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges, - TSnapRangeArray **fsrArr) { +int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges, + TFileSetRangeArray **fsrArr) { int32_t code = 0; STFileSet *fset; - STSnapRange *fsr1 = NULL; + STFileSetRange *fsr1 = NULL; SHashObj *pHash = NULL; fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0])); @@ -1156,7 +1158,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev tsdbInfo("pRanges size:%d", (pRanges == NULL ? 0 : TARRAY2_SIZE(pRanges))); if (pRanges) { - pHash = tsdbGetSnapRangeHash(pRanges); + pHash = tsdbFSetRangeArrayToHash(pRanges); if (pHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _out; @@ -1170,7 +1172,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev if (pHash) { int32_t fid = fset->fid; - STSnapRange *u = taosHashGet(pHash, &fid, sizeof(fid)); + STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid)); if (u) { sver1 = u->sver; tsdbDebug("range hash get fid:%d, sver:%" PRId64 ", ever:%" PRId64, u->fid, u->sver, u->ever); @@ -1184,7 +1186,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev tsdbDebug("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); - code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1); + code = tsdbTFileSetRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1); if (code) break; code = TARRAY2_APPEND(fsrArr[0], fsr1); @@ -1195,8 +1197,8 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev taosThreadMutexUnlock(&fs->tsdb->mutex); if (code) { - tsdbTSnapRangeClear(&fsr1); - TARRAY2_DESTROY(fsrArr[0], tsdbTSnapRangeClear); + tsdbTFileSetRangeClear(&fsr1); + TARRAY2_DESTROY(fsrArr[0], tsdbTFileSetRangeClear); fsrArr[0] = NULL; } @@ -1206,4 +1208,6 @@ _out: pHash = NULL; } return code; -} \ No newline at end of file +} + +int32_t tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr) { return tsdbTFileSetRangeArrayDestroy(fsrArr); } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 74453126cf..714bf5bf16 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -44,13 +44,13 @@ int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr); int32_t tsdbFSCreateRefSnapshotWithoutLock(STFileSystem *fs, TFileSetArray **fsetArr); int32_t tsdbFSDestroyRefSnapshot(TFileSetArray **fsetArr); -int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclude, TFileSetArray **fsetArr, +int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pExclude, TFileSetArray **fsetArr, TFileOpArray *fopArr); -int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr, TFileOpArray *fopArr); -int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TSnapRangeArray *pRanges, - TSnapRangeArray **fsrArr); -int32_t tsdbFSDestroyRefRangedSnapshot(TSnapRangeArray **fsrArr); -// txn +int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr); +int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges, + TFileSetRangeArray **fsrArr); +int32_t tsdbFSDestroyRefRangedSnapshot(TFileSetRangeArray **fsrArr); +// txn int64_t tsdbFSAllocEid(STFileSystem *fs); int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT etype); int32_t tsdbFSEditCommit(STFileSystem *fs); diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index 1bf886e3b0..e088f54930 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -533,7 +533,8 @@ int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_ return 0; } -int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr) { +int32_t tsdbTFileSetRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, + STFileSetRange **fsr) { fsr[0] = taosMemoryCalloc(1, sizeof(*fsr[0])); if (fsr[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; fsr[0]->fid = fset1->fid; @@ -575,7 +576,7 @@ int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fs return 0; } -int32_t tsdbTSnapRangeClear(STSnapRange **fsr) { +int32_t tsdbTFileSetRangeClear(STFileSetRange **fsr) { if (!fsr[0]) return 0; tsdbTFileSetClear(&fsr[0]->fset); @@ -584,6 +585,15 @@ int32_t tsdbTSnapRangeClear(STSnapRange **fsr) { return 0; } +int32_t tsdbTFileSetRangeArrayDestroy(TFileSetRangeArray** ppArr) { + if (ppArr && ppArr[0]) { + TARRAY2_DESTROY(ppArr[0], tsdbTFileSetRangeClear); + taosMemoryFree(ppArr[0]); + ppArr[0] = NULL; + } + return 0; +} + int32_t tsdbTFileSetClear(STFileSet **fset) { if (!fset[0]) return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 83f5b1e83c..0951a28f4e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -49,8 +49,8 @@ int32_t tsdbTFileSetRemove(STFileSet *fset); int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset, TFileOpArray *fopArr); -int32_t tsdbTSnapRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, STSnapRange **fsr); -int32_t tsdbTSnapRangeClear(STSnapRange **fsr); +int32_t tsdbTFileSetRangeInitRef(STsdb *pTsdb, const STFileSet *fset1, int64_t sver, int64_t ever, + STFileSetRange **fsr); // to/from json int32_t tsdbTFileSetToJson(const STFileSet *fset, cJSON *json); @@ -101,7 +101,7 @@ struct STFileSet { bool blockCommit; }; -struct STSnapRange { +struct STFileSetRange { int32_t fid; int64_t sver; int64_t ever; diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRAW.c b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.c new file mode 100644 index 0000000000..03c12502d5 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.c @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbFSetRAW.h" + +// SFSetRAWWriter ================================================== +typedef struct SFSetRAWWriter { + SFSetRAWWriterConfig config[1]; + + struct { + TFileOpArray fopArr[1]; + STFile file; + int64_t offset; + } ctx[1]; + + // writer + SDataFileRAWWriter *dataWriter; +} SFSetRAWWriter; + +int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **writer) { + int32_t code = 0; + int32_t lino = 0; + + writer[0] = taosMemoryCalloc(1, sizeof(SFSetRAWWriter)); + if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + writer[0]->config[0] = config[0]; + + TARRAY2_INIT(writer[0]->ctx->fopArr); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbFSetRAWWriterFinish(SFSetRAWWriter *writer, TFileOpArray *fopArr) { + int32_t code = 0; + int32_t lino = 0; + + STsdb *tsdb = writer->config->tsdb; + + STFileOp op; + TARRAY2_FOREACH(writer->ctx->fopArr, op) { + code = TARRAY2_APPEND(fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + } + + TARRAY2_CLEAR(writer->ctx->fopArr, NULL); +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbFSetRAWWriteFileDataBegin(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr) { + int32_t code = 0; + int32_t lino = 0; + + SDataFileRAWWriterConfig config = { + .tsdb = writer->config->tsdb, + .szPage = writer->config->szPage, + .fid = bHdr->file.fid, + .did = writer->config->did, + .cid = writer->config->cid, + .level = writer->config->level, + + .file = + { + .type = bHdr->file.type, + .fid = bHdr->file.fid, + .did = writer->config->did, + .cid = writer->config->cid, + .size = bHdr->file.size, + .minVer = bHdr->file.minVer, + .maxVer = bHdr->file.maxVer, + .stt = {{ + .level = bHdr->file.stt->level, + }}, + }, + }; + + writer->ctx->offset = 0; + writer->ctx->file = config.file; + + code = tsdbDataFileRAWWriterOpen(&config, &writer->dataWriter); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbFSetRAWWriteFileDataEnd(SFSetRAWWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbDataFileRAWWriterClose(&writer->dataWriter, false, writer->ctx->fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbFSetRAWWriterClose(SFSetRAWWriter **writer, bool abort, TFileOpArray *fopArr) { + if (writer[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + STsdb *tsdb = writer[0]->config->tsdb; + + // end + code = tsdbFSetRAWWriteFileDataEnd(writer[0]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDataFileRAWWriterClose(&writer[0]->dataWriter, abort, writer[0]->ctx->fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSetRAWWriterFinish(writer[0], fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + // free + TARRAY2_DESTROY(writer[0]->ctx->fopArr, NULL); + taosMemoryFree(writer[0]); + writer[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbFSetRAWWriteBlockData(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(writer->ctx->offset >= 0 && writer->ctx->offset <= writer->ctx->file.size); + + if (writer->ctx->offset == writer->ctx->file.size) { + code = tsdbFSetRAWWriteFileDataEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSetRAWWriteFileDataBegin(writer, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbDataFileRAWWriteBlockData(writer->dataWriter, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->offset += bHdr->dataLength; + ASSERT(writer->ctx->offset == writer->dataWriter->ctx->offset); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRAW.h b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.h new file mode 100644 index 0000000000..205c785e99 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbDataFileRAW.h" + +#ifndef _TSDB_FSET_RAW_H +#define _TSDB_FSET_RAW_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SFSetRAWWriterConfig { + STsdb *tsdb; + int32_t szPage; + + SDiskID did; + int64_t fid; + int64_t cid; + int32_t level; +} SFSetRAWWriterConfig; + +typedef struct SFSetRAWWriter SFSetRAWWriter; + +int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **writer); +int32_t tsdbFSetRAWWriterClose(SFSetRAWWriter **writer, bool abort, TFileOpArray *fopArr); +int32_t tsdbFSetRAWWriteBlockData(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr); + +#ifdef __cplusplus +} +#endif + +#endif /*_TSDB_FSET_RAW_H*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c new file mode 100644 index 0000000000..9dae9bdd36 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c @@ -0,0 +1,627 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" +#include "tsdbFS2.h" + +#define TSDB_SNAP_MSG_VER 1 + +// fset partition +static int32_t tsdbFSetPartCmprFn(STsdbFSetPartition* x, STsdbFSetPartition* y) { + if (x->fid < y->fid) return -1; + if (x->fid > y->fid) return 1; + return 0; +} + +static int32_t tVersionRangeCmprFn(SVersionRange* x, SVersionRange* y) { + if (x->minVer < y->minVer) return -1; + if (x->minVer > y->minVer) return 1; + if (x->maxVer < y->maxVer) return -1; + if (x->maxVer > y->maxVer) return 1; + return 0; +} + +static int32_t tsdbTFileSetRangeCmprFn(STFileSetRange* x, STFileSetRange* y) { + if (x->fid < y->fid) return -1; + if (x->fid > y->fid) return 1; + return 0; +} + +STsdbFSetPartition* tsdbFSetPartitionCreate() { + STsdbFSetPartition* pSP = taosMemoryCalloc(1, sizeof(STsdbFSetPartition)); + if (pSP == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + for (int32_t i = 0; i < TSDB_FSET_RANGE_TYP_MAX; i++) { + TARRAY2_INIT(&pSP->verRanges[i]); + } + return pSP; +} + +void tsdbFSetPartitionClear(STsdbFSetPartition** ppSP) { + if (ppSP == NULL || ppSP[0] == NULL) { + return; + } + for (int32_t i = 0; i < TSDB_FSET_RANGE_TYP_MAX; i++) { + TARRAY2_DESTROY(&ppSP[0]->verRanges[i], NULL); + } + taosMemoryFree(ppSP[0]); + ppSP[0] = NULL; +} + +static int32_t tsdbFTypeToFRangeType(tsdb_ftype_t ftype) { + switch (ftype) { + case TSDB_FTYPE_HEAD: + return TSDB_FSET_RANGE_TYP_HEAD; + case TSDB_FTYPE_DATA: + return TSDB_FSET_RANGE_TYP_DATA; + case TSDB_FTYPE_SMA: + return TSDB_FSET_RANGE_TYP_SMA; + case TSDB_FTYPE_TOMB: + return TSDB_FSET_RANGE_TYP_TOMB; + case TSDB_FTYPE_STT: + return TSDB_FSET_RANGE_TYP_STT; + } + return TSDB_FSET_RANGE_TYP_MAX; +} + +static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition** ppSP) { + STsdbFSetPartition* p = tsdbFSetPartitionCreate(); + if (p == NULL) { + goto _err; + } + + p->fid = fset->fid; + + int32_t code = 0; + int32_t typ = 0; + int32_t corrupt = false; + int32_t count = 0; + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (fset->farr[ftype] == NULL) continue; + typ = tsdbFTypeToFRangeType(ftype); + ASSERT(typ < TSDB_FSET_RANGE_TYP_MAX); + STFile* f = fset->farr[ftype]->f; + if (f->maxVer > fset->maxVerValid) { + corrupt = true; + tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 + ", ftype: %d", + fset->fid, fset->maxVerValid, f->minVer, f->maxVer, ftype); + continue; + } + count++; + SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; + code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + ASSERT(code == 0); + } + + typ = TSDB_FSET_RANGE_TYP_STT; + const SSttLvl* lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + STFile* f = fobj->f; + if (f->maxVer > fset->maxVerValid) { + corrupt = true; + tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 + ", ftype: %d", + fset->fid, fset->maxVerValid, f->minVer, f->maxVer, typ); + continue; + } + count++; + SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; + code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + ASSERT(code == 0); + } + } + if (corrupt && count == 0) { + SVersionRange vr = {.minVer = VERSION_MIN, .maxVer = fset->maxVerValid}; + code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + ASSERT(code == 0); + } + ppSP[0] = p; + return 0; + +_err: + tsdbFSetPartitionClear(&p); + return -1; +} + +// fset partition list +STsdbFSetPartList* tsdbFSetPartListCreate() { + STsdbFSetPartList* pList = taosMemoryCalloc(1, sizeof(STsdbFSetPartList)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + TARRAY2_INIT(pList); + return pList; +} + +void tsdbFSetPartListDestroy(STsdbFSetPartList** ppList) { + if (ppList == NULL || ppList[0] == NULL) return; + + TARRAY2_DESTROY(ppList[0], tsdbFSetPartitionClear); + taosMemoryFree(ppList[0]); + ppList[0] = NULL; +} + +int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray** ppRanges) { + TFileSetRangeArray* pDiff = taosMemoryCalloc(1, sizeof(TFileSetRangeArray)); + if (pDiff == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + TARRAY2_INIT(pDiff); + + STsdbFSetPartition* part; + TARRAY2_FOREACH(pList, part) { + STFileSetRange* r = taosMemoryCalloc(1, sizeof(STFileSetRange)); + if (r == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + int64_t maxVerValid = -1; + int32_t typMax = TSDB_FSET_RANGE_TYP_MAX; + for (int32_t i = 0; i < typMax; i++) { + SVerRangeList* iList = &part->verRanges[i]; + SVersionRange vr = {0}; + TARRAY2_FOREACH(iList, vr) { + if (vr.maxVer < vr.minVer) { + continue; + } + maxVerValid = TMAX(maxVerValid, vr.maxVer); + } + } + r->fid = part->fid; + r->sver = maxVerValid + 1; + r->ever = VERSION_MAX; + tsdbDebug("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever); + int32_t code = TARRAY2_SORT_INSERT(pDiff, r, tsdbTFileSetRangeCmprFn); + ASSERT(code == 0); + } + ppRanges[0] = pDiff; + + tsdbInfo("pDiff size:%d", TARRAY2_SIZE(pDiff)); + return 0; + +_err: + if (pDiff) { + tsdbTFileSetRangeArrayDestroy(&pDiff); + } + return -1; +} + +// serialization +int32_t tTsdbFSetPartListDataLenCalc(STsdbFSetPartList* pList) { + int32_t hdrLen = sizeof(int32_t); + int32_t datLen = 0; + + int8_t msgVer = 1; + int32_t len = TARRAY2_SIZE(pList); + hdrLen += sizeof(msgVer); + hdrLen += sizeof(len); + datLen += hdrLen; + + for (int32_t u = 0; u < len; u++) { + STsdbFSetPartition* p = TARRAY2_GET(pList, u); + int32_t typMax = TSDB_FSET_RANGE_TYP_MAX; + int32_t uItem = 0; + uItem += sizeof(STsdbFSetPartition); + uItem += sizeof(typMax); + + for (int32_t i = 0; i < typMax; i++) { + int32_t iLen = TARRAY2_SIZE(&p->verRanges[i]); + int32_t jItem = 0; + jItem += sizeof(SVersionRange); + jItem += sizeof(int64_t); + uItem += sizeof(iLen) + jItem * iLen; + } + datLen += uItem; + } + return datLen; +} + +int32_t tSerializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + int8_t reserved8 = 0; + int16_t reserved16 = 0; + int64_t reserved64 = 0; + + int8_t msgVer = TSDB_SNAP_MSG_VER; + int32_t len = TARRAY2_SIZE(pList); + + if (tStartEncode(&encoder) < 0) goto _err; + if (tEncodeI8(&encoder, msgVer) < 0) goto _err; + if (tEncodeI32(&encoder, len) < 0) goto _err; + + for (int32_t u = 0; u < len; u++) { + STsdbFSetPartition* p = TARRAY2_GET(pList, u); + if (tEncodeI64(&encoder, p->fid) < 0) goto _err; + if (tEncodeI8(&encoder, p->stat) < 0) goto _err; + if (tEncodeI8(&encoder, reserved8) < 0) goto _err; + if (tEncodeI16(&encoder, reserved16) < 0) goto _err; + + int32_t typMax = TSDB_FSET_RANGE_TYP_MAX; + if (tEncodeI32(&encoder, typMax) < 0) goto _err; + + for (int32_t i = 0; i < typMax; i++) { + SVerRangeList* iList = &p->verRanges[i]; + int32_t iLen = TARRAY2_SIZE(iList); + + if (tEncodeI32(&encoder, iLen) < 0) goto _err; + for (int32_t j = 0; j < iLen; j++) { + SVersionRange r = TARRAY2_GET(iList, j); + if (tEncodeI64(&encoder, r.minVer) < 0) goto _err; + if (tEncodeI64(&encoder, r.maxVer) < 0) goto _err; + if (tEncodeI64(&encoder, reserved64) < 0) goto _err; + } + } + } + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; + +_err: + tEncoderClear(&encoder); + return -1; +} + +int32_t tDeserializeTsdbFSetPartList(void* buf, int32_t bufLen, STsdbFSetPartList* pList) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + int8_t reserved8 = 0; + int16_t reserved16 = 0; + int64_t reserved64 = 0; + + STsdbFSetPartition* p = NULL; + + int8_t msgVer = 0; + int32_t len = 0; + if (tStartDecode(&decoder) < 0) goto _err; + if (tDecodeI8(&decoder, &msgVer) < 0) goto _err; + if (msgVer != TSDB_SNAP_MSG_VER) goto _err; + if (tDecodeI32(&decoder, &len) < 0) goto _err; + + for (int32_t u = 0; u < len; u++) { + p = tsdbFSetPartitionCreate(); + if (p == NULL) goto _err; + if (tDecodeI64(&decoder, &p->fid) < 0) goto _err; + if (tDecodeI8(&decoder, &p->stat) < 0) goto _err; + if (tDecodeI8(&decoder, &reserved8) < 0) goto _err; + if (tDecodeI16(&decoder, &reserved16) < 0) goto _err; + + int32_t typMax = 0; + if (tDecodeI32(&decoder, &typMax) < 0) goto _err; + + for (int32_t i = 0; i < typMax; i++) { + SVerRangeList* iList = &p->verRanges[i]; + int32_t iLen = 0; + if (tDecodeI32(&decoder, &iLen) < 0) goto _err; + for (int32_t j = 0; j < iLen; j++) { + SVersionRange r = {0}; + if (tDecodeI64(&decoder, &r.minVer) < 0) goto _err; + if (tDecodeI64(&decoder, &r.maxVer) < 0) goto _err; + if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; + TARRAY2_APPEND(iList, r); + } + } + TARRAY2_APPEND(pList, p); + p = NULL; + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; + +_err: + if (p) { + tsdbFSetPartitionClear(&p); + } + tDecoderClear(&decoder); + return -1; +} + +// fs state +static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) { + STsdbFSetPartList* pList = tsdbFSetPartListCreate(); + if (pList == NULL) { + return NULL; + } + + int32_t code = 0; + taosThreadMutexLock(&fs->tsdb->mutex); + STFileSet* fset; + TARRAY2_FOREACH(fs->fSetArr, fset) { + STsdbFSetPartition* pItem = NULL; + if (tsdbTFileSetToFSetPartition(fset, &pItem) < 0) { + code = -1; + break; + } + ASSERT(pItem != NULL); + code = TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn); + ASSERT(code == 0); + } + taosThreadMutexUnlock(&fs->tsdb->mutex); + + if (code) { + TARRAY2_DESTROY(pList, tsdbFSetPartitionClear); + taosMemoryFree(pList); + pList = NULL; + } + return pList; +} + +ETsdbFsState tsdbSnapGetFsState(SVnode* pVnode) { + if (!VND_IS_RSMA(pVnode)) { + return pVnode->pTsdb->pFS->fsstate; + } + for (int32_t lvl = 0; lvl < TSDB_RETENTION_MAX; ++lvl) { + STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, lvl); + if (pTsdb && pTsdb->pFS->fsstate != TSDB_FS_STATE_NORMAL) { + return TSDB_FS_STATE_INCOMPLETE; + } + } + return TSDB_FS_STATE_NORMAL; +} + +// description +typedef struct STsdbPartitionInfo { + int32_t vgId; + int32_t tsdbMaxCnt; + int32_t subTyps[TSDB_RETENTION_MAX]; + STsdbFSetPartList* pLists[TSDB_RETENTION_MAX]; +} STsdbPartitionInfo; + +static int32_t tsdbPartitionInfoInit(SVnode* pVnode, STsdbPartitionInfo* pInfo) { + int32_t subTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2}; + pInfo->vgId = TD_VID(pVnode); + pInfo->tsdbMaxCnt = (!VND_IS_RSMA(pVnode) ? 1 : TSDB_RETENTION_MAX); + + ASSERT(sizeof(pInfo->subTyps) == sizeof(subTyps)); + memcpy(pInfo->subTyps, (char*)subTyps, sizeof(subTyps)); + + // fset partition list + memset(pInfo->pLists, 0, sizeof(pInfo->pLists[0]) * TSDB_RETENTION_MAX); + for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) { + STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, j); + pInfo->pLists[j] = tsdbSnapGetFSetPartList(pTsdb->pFS); + if (pInfo->pLists[j] == NULL) return -1; + } + return 0; +} + +static void tsdbPartitionInfoClear(STsdbPartitionInfo* pInfo) { + for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) { + if (pInfo->pLists[j] == NULL) continue; + tsdbFSetPartListDestroy(&pInfo->pLists[j]); + } +} + +static int32_t tsdbPartitionInfoEstSize(STsdbPartitionInfo* pInfo) { + int32_t dataLen = 0; + for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) { + dataLen += sizeof(SSyncTLV); // subTyps[j] + dataLen += tTsdbFSetPartListDataLenCalc(pInfo->pLists[j]); + } + return dataLen; +} + +static int32_t tsdbPartitionInfoSerialize(STsdbPartitionInfo* pInfo, uint8_t* buf, int32_t bufLen) { + int32_t tlen = 0; + int32_t offset = 0; + for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) { + SSyncTLV* pSubHead = (void*)((char*)buf + offset); + int32_t valOffset = offset + sizeof(*pSubHead); + ASSERT(pSubHead->val == (char*)buf + valOffset); + if ((tlen = tSerializeTsdbFSetPartList(pSubHead->val, bufLen - valOffset, pInfo->pLists[j])) < 0) { + tsdbError("vgId:%d, failed to serialize fset partition list of tsdb %d since %s", pInfo->vgId, j, terrstr()); + return -1; + } + pSubHead->typ = pInfo->subTyps[j]; + pSubHead->len = tlen; + offset += sizeof(*pSubHead) + tlen; + } + return offset; +} + +// tsdb replication opts +static int32_t tTsdbRepOptsDataLenCalc(STsdbRepOpts* pInfo) { + int32_t hdrLen = sizeof(int32_t); + int32_t datLen = 0; + + int8_t msgVer = 0; + int64_t reserved64 = 0; + int16_t format = 0; + hdrLen += sizeof(msgVer); + datLen += hdrLen; + datLen += sizeof(format); + datLen += sizeof(reserved64); + datLen += sizeof(*pInfo); + return datLen; +} + +int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + int64_t reserved64 = 0; + int8_t msgVer = TSDB_SNAP_MSG_VER; + + if (tStartEncode(&encoder) < 0) goto _err; + if (tEncodeI8(&encoder, msgVer) < 0) goto _err; + int16_t format = pOpts->format; + if (tEncodeI16(&encoder, format) < 0) goto _err; + if (tEncodeI64(&encoder, reserved64) < 0) goto _err; + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; + +_err: + tEncoderClear(&encoder); + return -1; +} + +int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + int64_t reserved64 = 0; + int8_t msgVer = 0; + + if (tStartDecode(&decoder) < 0) goto _err; + if (tDecodeI8(&decoder, &msgVer) < 0) goto _err; + if (msgVer != TSDB_SNAP_MSG_VER) goto _err; + int16_t format = 0; + if (tDecodeI16(&decoder, &format) < 0) goto _err; + pOpts->format = format; + if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; + +_err: + tDecoderClear(&decoder); + return -1; +} + +static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) { + int32_t dataLen = 0; + dataLen += sizeof(SSyncTLV); + dataLen += tTsdbRepOptsDataLenCalc(pOpts); + return dataLen; +} + +static int32_t tsdbRepOptsSerialize(STsdbRepOpts* pOpts, void* buf, int32_t bufLen) { + SSyncTLV* pSubHead = buf; + int32_t offset = 0; + int32_t tlen = 0; + if ((tlen = tSerializeTsdbRepOpts(pSubHead->val, bufLen, pOpts)) < 0) { + return -1; + } + pSubHead->typ = SNAP_DATA_RAW; + pSubHead->len = tlen; + offset += sizeof(*pSubHead) + tlen; + return offset; +} + +// snap info +static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, STsdbRepOpts* pInfo) { + if (!pSnap->data) return 0; + int32_t code = -1; + + SSyncTLV* pHead = (void*)pSnap->data; + int32_t offset = 0; + + while (offset + sizeof(*pHead) < pHead->len) { + SSyncTLV* pField = (void*)(pHead->val + offset); + offset += sizeof(*pField) + pField->len; + void* buf = pField->val; + int32_t bufLen = pField->len; + + switch (pField->typ) { + case SNAP_DATA_TSDB: + case SNAP_DATA_RSMA1: + case SNAP_DATA_RSMA2: { + } break; + case SNAP_DATA_RAW: { + if (tDeserializeTsdbRepOpts(buf, bufLen, pInfo) < 0) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + tsdbError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; + default: + tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ); + goto _out; + } + } + + code = 0; +_out: + return code; +} + +int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { + ASSERT(pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY); + STsdbPartitionInfo partitionInfo = {0}; + int code = -1; + STsdbPartitionInfo* pInfo = &partitionInfo; + + if (tsdbPartitionInfoInit(pVnode, pInfo) != 0) { + goto _out; + } + + // deal with snap info for reply + STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW}; + if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + STsdbRepOpts leaderOpts = {0}; + if (tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts) < 0) { + tsdbError("vgId:%d, failed to deal with snap info for reply since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + opts.format = TMIN(opts.format, leaderOpts.format); + } + + // info data realloc + const int32_t headLen = sizeof(SSyncTLV); + int32_t bufLen = headLen; + bufLen += tsdbPartitionInfoEstSize(pInfo); + bufLen += tsdbRepOptsEstSize(&opts); + if (syncSnapInfoDataRealloc(pSnap, bufLen) != 0) { + tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen); + goto _out; + } + + // serialization + char* buf = (void*)pSnap->data; + int32_t offset = headLen; + int32_t tlen = 0; + + if ((tlen = tsdbPartitionInfoSerialize(pInfo, buf + offset, bufLen - offset)) < 0) { + tsdbError("vgId:%d, failed to serialize tsdb partition info since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + offset += tlen; + ASSERT(offset <= bufLen); + + if ((tlen = tsdbRepOptsSerialize(&opts, buf + offset, bufLen - offset)) < 0) { + tsdbError("vgId:%d, failed to serialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + offset += tlen; + ASSERT(offset <= bufLen); + + // set header of info data + SSyncTLV* pHead = pSnap->data; + pHead->typ = pSnap->type; + pHead->len = offset - headLen; + + tsdbInfo("vgId:%d, tsdb snap info prepared. type:%s, val length:%d", TD_VID(pVnode), TMSG_INFO(pHead->typ), + pHead->len); + code = 0; +_out: + tsdbPartitionInfoClear(pInfo); + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index e757daa0af..6aff1c2930 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -32,12 +32,12 @@ struct STsdbSnapReader { uint8_t* aBuf[5]; SSkmInfo skmTb[1]; - TSnapRangeArray* fsrArr; + TFileSetRangeArray* fsrArr; // context struct { int32_t fsrArrIdx; - STSnapRange* fsr; + STFileSetRange* fsr; bool isDataDone; bool isTombDone; } ctx[1]; @@ -331,7 +331,7 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** dat if (!(reader->blockData->nRow % 16)) { int64_t nData = tBlockDataSize(reader->blockData); - if (nData >= 1 * 1024 * 1024) { + if (nData >= TSDB_SNAP_DATA_PAYLOAD_SIZE) { break; } } @@ -437,14 +437,14 @@ int32_t tsdbSnapReaderOpen(STsdb* tsdb, int64_t sver, int64_t ever, int8_t type, reader[0]->ever = ever; reader[0]->type = type; - code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TSnapRangeArray*)pRanges, &reader[0]->fsrArr); + code = tsdbFSCreateRefRangedSnapshot(tsdb->pFS, sver, ever, (TFileSetRangeArray*)pRanges, &reader[0]->fsrArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, lino, tstrerror(code), sver, ever, type); - tsdbSnapRangeArrayDestroy(&reader[0]->fsrArr); + tsdbTFileSetRangeArrayDestroy(&reader[0]->fsrArr); taosMemoryFree(reader[0]); reader[0] = NULL; } else { @@ -472,7 +472,7 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) { TARRAY2_DESTROY(reader[0]->sttReaderArr, tsdbSttFileReaderClose); tsdbDataFileReaderClose(&reader[0]->dataReader); - tsdbSnapRangeArrayDestroy(&reader[0]->fsrArr); + tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr); tDestroyTSchema(reader[0]->skmTb->pTSchema); for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->aBuf); ++i) { @@ -1061,7 +1061,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang writer[0]->compactVersion = INT64_MAX; writer[0]->now = taosGetTimestampMs(); - code = tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TSnapRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr); + code = tsdbFSCreateCopyRangedSnapshot(pTsdb->pFS, (TFileSetRangeArray*)pRanges, &writer[0]->fsetArr, writer[0]->fopArr); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -1125,7 +1125,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { tsdbDataFileReaderClose(&writer[0]->ctx->dataReader); TARRAY2_DESTROY(writer[0]->fopArr, NULL); - tsdbFSDestroyCopySnapshot(&writer[0]->fsetArr); + tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr); for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->aBuf); ++i) { tFree(writer[0]->aBuf[i]); @@ -1167,439 +1167,3 @@ _exit: } return code; } - -// snap part -static int32_t tsdbSnapPartCmprFn(STsdbSnapPartition* x, STsdbSnapPartition* y) { - if (x->fid < y->fid) return -1; - if (x->fid > y->fid) return 1; - return 0; -} - -static int32_t tVersionRangeCmprFn(SVersionRange* x, SVersionRange* y) { - if (x->minVer < y->minVer) return -1; - if (x->minVer > y->minVer) return 1; - if (x->maxVer < y->maxVer) return -1; - if (x->maxVer > y->maxVer) return 1; - return 0; -} - -static int32_t tsdbSnapRangeCmprFn(STSnapRange* x, STSnapRange* y) { - if (x->fid < y->fid) return -1; - if (x->fid > y->fid) return 1; - return 0; -} - -STsdbSnapPartition* tsdbSnapPartitionCreate() { - STsdbSnapPartition* pSP = taosMemoryCalloc(1, sizeof(STsdbSnapPartition)); - if (pSP == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - for (int32_t i = 0; i < TSDB_SNAP_RANGE_TYP_MAX; i++) { - TARRAY2_INIT(&pSP->verRanges[i]); - } - return pSP; -} - -void tsdbSnapPartitionClear(STsdbSnapPartition** ppSP) { - if (ppSP == NULL || ppSP[0] == NULL) { - return; - } - for (int32_t i = 0; i < TSDB_SNAP_RANGE_TYP_MAX; i++) { - TARRAY2_DESTROY(&ppSP[0]->verRanges[i], NULL); - } - taosMemoryFree(ppSP[0]); - ppSP[0] = NULL; -} - -static int32_t tsdbFTypeToSRangeTyp(tsdb_ftype_t ftype) { - switch (ftype) { - case TSDB_FTYPE_HEAD: - return TSDB_SNAP_RANGE_TYP_HEAD; - case TSDB_FTYPE_DATA: - return TSDB_SNAP_RANGE_TYP_DATA; - case TSDB_FTYPE_SMA: - return TSDB_SNAP_RANGE_TYP_SMA; - case TSDB_FTYPE_TOMB: - return TSDB_SNAP_RANGE_TYP_TOMB; - case TSDB_FTYPE_STT: - return TSDB_SNAP_RANGE_TYP_STT; - } - return TSDB_SNAP_RANGE_TYP_MAX; -} - -static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP) { - STsdbSnapPartition* p = tsdbSnapPartitionCreate(); - if (p == NULL) { - goto _err; - } - - p->fid = fset->fid; - - int32_t code = 0; - int32_t typ = 0; - int32_t corrupt = false; - int32_t count = 0; - for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { - if (fset->farr[ftype] == NULL) continue; - typ = tsdbFTypeToSRangeTyp(ftype); - ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX); - STFile* f = fset->farr[ftype]->f; - if (f->maxVer > fset->maxVerValid) { - corrupt = true; - tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 - ", ftype: %d", - fset->fid, fset->maxVerValid, f->minVer, f->maxVer, ftype); - continue; - } - count++; - SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; - code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); - ASSERT(code == 0); - } - - typ = TSDB_SNAP_RANGE_TYP_STT; - const SSttLvl* lvl; - TARRAY2_FOREACH(fset->lvlArr, lvl) { - STFileObj* fobj; - TARRAY2_FOREACH(lvl->fobjArr, fobj) { - STFile* f = fobj->f; - if (f->maxVer > fset->maxVerValid) { - corrupt = true; - tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 - ", ftype: %d", - fset->fid, fset->maxVerValid, f->minVer, f->maxVer, typ); - continue; - } - count++; - SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; - code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); - ASSERT(code == 0); - } - } - if (corrupt && count == 0) { - SVersionRange vr = {.minVer = VERSION_MIN, .maxVer = fset->maxVerValid}; - code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); - ASSERT(code == 0); - } - ppSP[0] = p; - return 0; - -_err: - tsdbSnapPartitionClear(&p); - return -1; -} - -STsdbSnapPartList* tsdbSnapPartListCreate() { - STsdbSnapPartList* pList = taosMemoryCalloc(1, sizeof(STsdbSnapPartList)); - if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - TARRAY2_INIT(pList); - return pList; -} - -static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) { - STsdbSnapPartList* pList = tsdbSnapPartListCreate(); - if (pList == NULL) { - return NULL; - } - - int32_t code = 0; - taosThreadMutexLock(&fs->tsdb->mutex); - STFileSet* fset; - TARRAY2_FOREACH(fs->fSetArr, fset) { - STsdbSnapPartition* pItem = NULL; - if (tsdbTFileSetToSnapPart(fset, &pItem) < 0) { - code = -1; - break; - } - ASSERT(pItem != NULL); - code = TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn); - ASSERT(code == 0); - } - taosThreadMutexUnlock(&fs->tsdb->mutex); - - if (code) { - TARRAY2_DESTROY(pList, tsdbSnapPartitionClear); - taosMemoryFree(pList); - pList = NULL; - } - return pList; -} - -int32_t tTsdbSnapPartListDataLenCalc(STsdbSnapPartList* pList) { - int32_t hdrLen = sizeof(int32_t); - int32_t datLen = 0; - - int8_t msgVer = 1; - int32_t len = TARRAY2_SIZE(pList); - hdrLen += sizeof(msgVer); - hdrLen += sizeof(len); - datLen += hdrLen; - - for (int32_t u = 0; u < len; u++) { - STsdbSnapPartition* p = TARRAY2_GET(pList, u); - int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX; - int32_t uItem = 0; - uItem += sizeof(STsdbSnapPartition); - uItem += sizeof(typMax); - - for (int32_t i = 0; i < typMax; i++) { - int32_t iLen = TARRAY2_SIZE(&p->verRanges[i]); - int32_t jItem = 0; - jItem += sizeof(SVersionRange); - jItem += sizeof(int64_t); - uItem += sizeof(iLen) + jItem * iLen; - } - datLen += uItem; - } - return datLen; -} - -int32_t tSerializeTsdbSnapPartList(void* buf, int32_t bufLen, STsdbSnapPartList* pList) { - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, bufLen); - - int8_t reserved8 = 0; - int16_t reserved16 = 0; - int64_t reserved64 = 0; - - int8_t msgVer = 1; - int32_t len = TARRAY2_SIZE(pList); - - if (tStartEncode(&encoder) < 0) goto _err; - if (tEncodeI8(&encoder, msgVer) < 0) goto _err; - if (tEncodeI32(&encoder, len) < 0) goto _err; - - for (int32_t u = 0; u < len; u++) { - STsdbSnapPartition* p = TARRAY2_GET(pList, u); - if (tEncodeI64(&encoder, p->fid) < 0) goto _err; - if (tEncodeI8(&encoder, p->stat) < 0) goto _err; - if (tEncodeI8(&encoder, reserved8) < 0) goto _err; - if (tEncodeI16(&encoder, reserved16) < 0) goto _err; - - int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX; - if (tEncodeI32(&encoder, typMax) < 0) goto _err; - - for (int32_t i = 0; i < typMax; i++) { - SVerRangeList* iList = &p->verRanges[i]; - int32_t iLen = TARRAY2_SIZE(iList); - - if (tEncodeI32(&encoder, iLen) < 0) goto _err; - for (int32_t j = 0; j < iLen; j++) { - SVersionRange r = TARRAY2_GET(iList, j); - if (tEncodeI64(&encoder, r.minVer) < 0) goto _err; - if (tEncodeI64(&encoder, r.maxVer) < 0) goto _err; - if (tEncodeI64(&encoder, reserved64) < 0) goto _err; - } - } - } - - tEndEncode(&encoder); - int32_t tlen = encoder.pos; - tEncoderClear(&encoder); - return tlen; - -_err: - tEncoderClear(&encoder); - return -1; -} - -int32_t tDeserializeTsdbSnapPartList(void* buf, int32_t bufLen, STsdbSnapPartList* pList) { - SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); - - int8_t reserved8 = 0; - int16_t reserved16 = 0; - int64_t reserved64 = 0; - - STsdbSnapPartition* p = NULL; - - int8_t msgVer = 0; - int32_t len = 0; - if (tStartDecode(&decoder) < 0) goto _err; - if (tDecodeI8(&decoder, &msgVer) < 0) goto _err; - if (tDecodeI32(&decoder, &len) < 0) goto _err; - - for (int32_t u = 0; u < len; u++) { - p = tsdbSnapPartitionCreate(); - if (p == NULL) goto _err; - if (tDecodeI64(&decoder, &p->fid) < 0) goto _err; - if (tDecodeI8(&decoder, &p->stat) < 0) goto _err; - if (tDecodeI8(&decoder, &reserved8) < 0) goto _err; - if (tDecodeI16(&decoder, &reserved16) < 0) goto _err; - - int32_t typMax = 0; - if (tDecodeI32(&decoder, &typMax) < 0) goto _err; - - for (int32_t i = 0; i < typMax; i++) { - SVerRangeList* iList = &p->verRanges[i]; - int32_t iLen = 0; - if (tDecodeI32(&decoder, &iLen) < 0) goto _err; - for (int32_t j = 0; j < iLen; j++) { - SVersionRange r = {0}; - if (tDecodeI64(&decoder, &r.minVer) < 0) goto _err; - if (tDecodeI64(&decoder, &r.maxVer) < 0) goto _err; - if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; - TARRAY2_APPEND(iList, r); - } - } - TARRAY2_APPEND(pList, p); - p = NULL; - } - - tEndDecode(&decoder); - tDecoderClear(&decoder); - return 0; - -_err: - if (p) { - tsdbSnapPartitionClear(&p); - } - tDecoderClear(&decoder); - return -1; -} - -int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList* pList, TSnapRangeArray** ppRanges) { - TSnapRangeArray* pDiff = taosMemoryCalloc(1, sizeof(TSnapRangeArray)); - if (pDiff == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - TARRAY2_INIT(pDiff); - - STsdbSnapPartition* part; - TARRAY2_FOREACH(pList, part) { - STSnapRange* r = taosMemoryCalloc(1, sizeof(STSnapRange)); - if (r == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - int64_t maxVerValid = -1; - int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX; - for (int32_t i = 0; i < typMax; i++) { - SVerRangeList* iList = &part->verRanges[i]; - SVersionRange vr = {0}; - TARRAY2_FOREACH(iList, vr) { - if (vr.maxVer < vr.minVer) { - continue; - } - maxVerValid = TMAX(maxVerValid, vr.maxVer); - } - } - r->fid = part->fid; - r->sver = maxVerValid + 1; - r->ever = VERSION_MAX; - tsdbDebug("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever); - int32_t code = TARRAY2_SORT_INSERT(pDiff, r, tsdbSnapRangeCmprFn); - ASSERT(code == 0); - } - ppRanges[0] = pDiff; - - tsdbInfo("pDiff size:%d", TARRAY2_SIZE(pDiff)); - return 0; - -_err: - if (pDiff) { - tsdbSnapRangeArrayDestroy(&pDiff); - } - return -1; -} - -void tsdbSnapRangeArrayDestroy(TSnapRangeArray** ppSnap) { - if (ppSnap && ppSnap[0]) { - TARRAY2_DESTROY(ppSnap[0], tsdbTSnapRangeClear); - taosMemoryFree(ppSnap[0]); - ppSnap[0] = NULL; - } -} - -void tsdbSnapPartListDestroy(STsdbSnapPartList** ppList) { - if (ppList == NULL || ppList[0] == NULL) return; - - TARRAY2_DESTROY(ppList[0], tsdbSnapPartitionClear); - taosMemoryFree(ppList[0]); - ppList[0] = NULL; -} - -ETsdbFsState tsdbSnapGetFsState(SVnode* pVnode) { - if (!VND_IS_RSMA(pVnode)) { - return pVnode->pTsdb->pFS->fsstate; - } - for (int32_t lvl = 0; lvl < TSDB_RETENTION_MAX; ++lvl) { - STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, lvl); - if (pTsdb && pTsdb->pFS->fsstate != TSDB_FS_STATE_NORMAL) { - return TSDB_FS_STATE_INCOMPLETE; - } - } - return TSDB_FS_STATE_NORMAL; -} - -int32_t tsdbSnapGetDetails(SVnode* pVnode, SSnapshot* pSnap) { - int code = -1; - int32_t tsdbMaxCnt = (!VND_IS_RSMA(pVnode) ? 1 : TSDB_RETENTION_MAX); - int32_t subTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2}; - STsdbSnapPartList* pLists[TSDB_RETENTION_MAX] = {0}; - - for (int32_t j = 0; j < tsdbMaxCnt; ++j) { - STsdb* pTsdb = SMA_RSMA_GET_TSDB(pVnode, j); - pLists[j] = tsdbGetSnapPartList(pTsdb->pFS); - if (pLists[j] == NULL) goto _out; - } - - // estimate bufLen and prepare - int32_t bufLen = sizeof(SSyncTLV); // typ: TDMT_SYNC_PREP_SNAPSHOT or TDMT_SYNC_PREP_SNAPSOT_REPLY - for (int32_t j = 0; j < tsdbMaxCnt; ++j) { - bufLen += sizeof(SSyncTLV); // subTyps[j] - bufLen += tTsdbSnapPartListDataLenCalc(pLists[j]); - } - - tsdbInfo("vgId:%d, allocate %d bytes for data of snapshot info.", TD_VID(pVnode), bufLen); - - void* data = taosMemoryRealloc(pSnap->data, bufLen); - if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tsdbError("vgId:%d, failed to realloc memory for data of snapshot info. bytes:%d", TD_VID(pVnode), bufLen); - goto _out; - } - pSnap->data = data; - - // header - SSyncTLV* head = data; - head->len = 0; - head->typ = pSnap->type; - int32_t offset = sizeof(SSyncTLV); - int32_t tlen = 0; - - // fill snapshot info - for (int32_t j = 0; j < tsdbMaxCnt; ++j) { - if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - } - - // subHead - SSyncTLV* subHead = (void*)((char*)data + offset); - subHead->typ = subTyps[j]; - ASSERT(subHead->val == (char*)data + offset + sizeof(SSyncTLV)); - - if ((tlen = tSerializeTsdbSnapPartList(subHead->val, bufLen - offset - sizeof(SSyncTLV), pLists[j])) < 0) { - tsdbError("vgId:%d, failed to serialize snap partition list of tsdb %d since %s", TD_VID(pVnode), j, terrstr()); - goto _out; - } - subHead->len = tlen; - offset += sizeof(SSyncTLV) + tlen; - } - - head->len = offset - sizeof(SSyncTLV); - ASSERT(offset <= bufLen); - code = 0; - -_out: - for (int32_t j = 0; j < tsdbMaxCnt; ++j) { - if (pLists[j] == NULL) continue; - tsdbSnapPartListDestroy(&pLists[j]); - } - - return code; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c new file mode 100644 index 0000000000..b7c22aa0e9 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c @@ -0,0 +1,599 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" +#include "tsdbDataFileRAW.h" +#include "tsdbFS2.h" +#include "tsdbFSetRAW.h" + +static int32_t tsdbSnapRAWReadFileSetCloseReader(STsdbSnapRAWReader* reader); + +// reader +typedef struct SDataFileRAWReaderIter { + int32_t count; + int32_t idx; +} SDataFileRAWReaderIter; + +typedef struct STsdbSnapRAWReader { + STsdb* tsdb; + int64_t ever; + int8_t type; + + TFileSetArray* fsetArr; + + // context + struct { + int32_t fsetArrIdx; + STFileSet* fset; + bool isDataDone; + } ctx[1]; + + // reader + SDataFileRAWReaderArray dataReaderArr[1]; + + // iter + SDataFileRAWReaderIter dataIter[1]; +} STsdbSnapRAWReader; + +int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** reader) { + int32_t code = 0; + int32_t lino = 0; + + reader[0] = taosMemoryCalloc(1, sizeof(STsdbSnapRAWReader)); + if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + reader[0]->tsdb = tsdb; + reader[0]->ever = ever; + reader[0]->type = type; + + code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, sver:0, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, + lino, tstrerror(code), ever, type); + tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + taosMemoryFree(reader[0]); + reader[0] = NULL; + } else { + tsdbInfo("vgId:%d tsdb snapshot reader opened. sver:0, ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), ever, type); + } + return code; +} + +int32_t tsdbSnapRAWReaderClose(STsdbSnapRAWReader** reader) { + if (reader[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + STsdb* tsdb = reader[0]->tsdb; + + TARRAY2_DESTROY(reader[0]->dataReaderArr, tsdbDataFileRAWReaderClose); + tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + taosMemoryFree(reader[0]); + reader[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + // data + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { + if (reader->ctx->fset->farr[ftype] == NULL) { + continue; + } + STFileObj* fobj = reader->ctx->fset->farr[ftype]; + SDataFileRAWReader* dataReader; + SDataFileRAWReaderConfig config = { + .tsdb = reader->tsdb, + .szPage = reader->tsdb->pVnode->config.tsdbPageSize, + .file = fobj->f[0], + }; + code = tsdbDataFileRAWReaderOpen(NULL, &config, &dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // stt + SSttLvl* lvl; + TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + SDataFileRAWReader* dataReader; + SDataFileRAWReaderConfig config = { + .tsdb = reader->tsdb, + .szPage = reader->tsdb->pVnode->config.tsdbPageSize, + .file = fobj->f[0], + }; + code = tsdbDataFileRAWReaderOpen(NULL, &config, &dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetCloseReader(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + TARRAY2_CLEAR(reader->dataReaderArr, tsdbDataFileRAWReaderClose); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetOpenIter(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + reader->dataIter->count = TARRAY2_SIZE(reader->dataReaderArr); + reader->dataIter->idx = 0; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) { + reader->dataIter->count = 0; + reader->dataIter->idx = 0; + return 0; +} + +static int64_t tsdbSnapRAWReadPeek(SDataFileRAWReader* reader) { + int64_t size = TMIN(reader->config->file.size - reader->ctx->offset, TSDB_SNAP_DATA_PAYLOAD_SIZE); + return size; +} + +static SDataFileRAWReader* tsdbSnapRAWReaderIterNext(STsdbSnapRAWReader* reader) { + ASSERT(reader->dataIter->idx <= reader->dataIter->count); + + while (reader->dataIter->idx < reader->dataIter->count) { + SDataFileRAWReader* dataReader = TARRAY2_GET(reader->dataReaderArr, reader->dataIter->idx); + ASSERT(dataReader); + if (dataReader->ctx->offset < dataReader->config->file.size) { + return dataReader; + } + reader->dataIter->idx++; + } + return NULL; +} + +static int32_t tsdbSnapRAWReadNext(STsdbSnapRAWReader* reader, SSnapDataHdr** ppData) { + int32_t code = 0; + int32_t lino = 0; + int8_t type = reader->type; + ppData[0] = NULL; + + SDataFileRAWReader* dataReader = tsdbSnapRAWReaderIterNext(reader); + if (dataReader == NULL) { + return 0; + } + + // prepare + int64_t dataLength = tsdbSnapRAWReadPeek(dataReader); + ASSERT(dataLength > 0); + + void* pBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + sizeof(STsdbDataRAWBlockHeader) + dataLength); + if (pBuf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + SSnapDataHdr* pHdr = pBuf; + pHdr->type = type; + pHdr->size = sizeof(STsdbDataRAWBlockHeader) + dataLength; + + // read + STsdbDataRAWBlockHeader* pBlock = (void*)pHdr->data; + pBlock->offset = dataReader->ctx->offset; + pBlock->dataLength = dataLength; + + code = tsdbDataFileRAWReadBlockData(dataReader, pBlock); + TSDB_CHECK_CODE(code, lino, _exit); + + // finish + dataReader->ctx->offset += pBlock->dataLength; + ASSERT(dataReader->ctx->offset <= dataReader->config->file.size); + ppData[0] = pBuf; + +_exit: + if (code) { + taosMemoryFree(pBuf); + pBuf = NULL; + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadData(STsdbSnapRAWReader* reader, uint8_t** ppData) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbSnapRAWReadNext(reader, (SSnapDataHdr**)ppData); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(reader->ctx->fset == NULL); + + if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { + reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); + reader->ctx->isDataDone = false; + + code = tsdbSnapRAWReadFileSetOpenReader(reader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapRAWReadFileSetOpenIter(reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadEnd(STsdbSnapRAWReader* reader) { + tsdbSnapRAWReadFileSetCloseIter(reader); + tsdbSnapRAWReadFileSetCloseReader(reader); + reader->ctx->fset = NULL; + return 0; +} + +int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* reader, uint8_t** data) { + int32_t code = 0; + int32_t lino = 0; + + data[0] = NULL; + + for (;;) { + if (reader->ctx->fset == NULL) { + code = tsdbSnapRAWReadBegin(reader); + TSDB_CHECK_CODE(code, lino, _exit); + + if (reader->ctx->fset == NULL) { + break; + } + } + + if (!reader->ctx->isDataDone) { + code = tsdbSnapRAWReadData(reader, data); + TSDB_CHECK_CODE(code, lino, _exit); + if (data[0]) { + goto _exit; + } else { + reader->ctx->isDataDone = true; + } + } + + code = tsdbSnapRAWReadEnd(reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__); + } + return code; +} + +// writer +struct STsdbSnapRAWWriter { + STsdb* tsdb; + int64_t sver; + int64_t ever; + int32_t minutes; + int8_t precision; + int32_t minRow; + int32_t maxRow; + int8_t cmprAlg; + int64_t commitID; + int32_t szPage; + int64_t compactVersion; + int64_t now; + + TFileSetArray* fsetArr; + TFileOpArray fopArr[1]; + + struct { + bool fsetWriteBegin; + int32_t fid; + STFileSet* fset; + SDiskID did; + int64_t cid; + int64_t level; + + // writer + SFSetRAWWriter* fsetWriter; + } ctx[1]; +}; + +int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** writer) { + int32_t code = 0; + int32_t lino = 0; + + // start to write + writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); + if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + writer[0]->tsdb = pTsdb; + writer[0]->ever = ever; + writer[0]->minutes = pTsdb->keepCfg.days; + writer[0]->precision = pTsdb->keepCfg.precision; + writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; + writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS); + writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize; + writer[0]->compactVersion = INT64_MAX; + writer[0]->now = taosGetTimestampMs(); + + code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done, sver:0, ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, ever); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetOpenIter(STsdbSnapRAWWriter* writer) { + int32_t code = 0; + int32_t lino = 0; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetCloseIter(STsdbSnapRAWWriter* writer) { return 0; } + +static int32_t tsdbSnapRAWWriteFileSetOpenWriter(STsdbSnapRAWWriter* writer) { + int32_t code = 0; + int32_t lino = 0; + + SFSetRAWWriterConfig config = { + .tsdb = writer->tsdb, + .szPage = writer->szPage, + .fid = writer->ctx->fid, + .cid = writer->commitID, + .did = writer->ctx->did, + .level = writer->ctx->level, + }; + + code = tsdbFSetRAWWriterOpen(&config, &writer->ctx->fsetWriter); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetCloseWriter(STsdbSnapRAWWriter* writer) { + return tsdbFSetRAWWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr); +} + +static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t fid) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(writer->ctx->fsetWriteBegin == false); + + STFileSet* fset = &(STFileSet){.fid = fid}; + + writer->ctx->fid = fid; + STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ); + writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr; + + int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec()); + if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) { + code = TSDB_CODE_NO_AVAIL_DISK; + TSDB_CHECK_CODE(code, lino, _exit); + } + tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did); + + code = tsdbSnapRAWWriteFileSetOpenWriter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->level = level; + writer->ctx->fsetWriteBegin = true; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetEnd(STsdbSnapRAWWriter* writer) { + if (!writer->ctx->fsetWriteBegin) return 0; + + int32_t code = 0; + int32_t lino = 0; + + // close write + code = tsdbSnapRAWWriteFileSetCloseWriter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->fsetWriteBegin = false; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* writer) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbSnapRAWWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__); + } + return code; +} + +int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** writer, int8_t rollback) { + if (writer[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + STsdb* tsdb = writer[0]->tsdb; + + if (rollback) { + code = tsdbFSEditAbort(writer[0]->tsdb->pFS); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + taosThreadMutexLock(&writer[0]->tsdb->mutex); + + code = tsdbFSEditCommit(writer[0]->tsdb->pFS); + if (code) { + taosThreadMutexUnlock(&writer[0]->tsdb->mutex); + TSDB_CHECK_CODE(code, lino, _exit); + } + + writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL; + + taosThreadMutexUnlock(&writer[0]->tsdb->mutex); + } + + TARRAY2_DESTROY(writer[0]->fopArr, NULL); + tsdbFSDestroyCopySnapshot(&writer[0]->fsetArr); + + taosMemoryFree(writer[0]); + writer[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } else { + tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); + } + return code; +} + +static int32_t tsdbSnapRAWWriteTimeSeriesData(STsdbSnapRAWWriter* writer, STsdbDataRAWBlockHeader* bHdr) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFSetRAWWriteBlockData(writer->ctx->fsetWriter, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteData(STsdbSnapRAWWriter* writer, SSnapDataHdr* hdr) { + int32_t code = 0; + int32_t lino = 0; + + STsdbDataRAWBlockHeader* bHdr = (void*)hdr->data; + int32_t fid = bHdr->file.fid; + if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) { + code = tsdbSnapRAWWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapRAWWriteFileSetBegin(writer, fid); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbSnapRAWWriteTimeSeriesData(writer, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* writer, SSnapDataHdr* hdr) { + ASSERT(hdr->type == SNAP_DATA_RAW); + + int32_t code = 0; + int32_t lino = 0; + + code = tsdbSnapRAWWriteData(writer, hdr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64, + TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size); + } else { + tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__, + hdr->type, hdr->index, hdr->size); + } + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 34b508388f..ed1dcc64c9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -16,6 +16,26 @@ #include "tsdb.h" #include "vnd.h" +static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TFileSetRangeArray **ppRanges) { + int32_t code = -1; + STsdbFSetPartList *pList = tsdbFSetPartListCreate(); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _out; + } + if (tDeserializeTsdbFSetPartList(buf, bufLen, pList) < 0) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + goto _out; + } + if (tsdbFSetPartListToRangeDiff(pList, ppRanges) < 0) { + goto _out; + } + code = 0; +_out: + tsdbFSetPartListDestroy(&pList); + return code; +} + // SVSnapReader ======================================================== struct SVSnapReader { SVnode *pVnode; @@ -29,8 +49,12 @@ struct SVSnapReader { SMetaSnapReader *pMetaReader; // tsdb int8_t tsdbDone; - TSnapRangeArray *pRanges; + TFileSetRangeArray *pRanges; STsdbSnapReader *pTsdbReader; + // tsdb raw + int8_t tsdbRAWDone; + STsdbSnapRAWReader *pTsdbRAWReader; + // tq int8_t tqHandleDone; STqSnapReader *pTqSnapReader; @@ -45,31 +69,11 @@ struct SVSnapReader { SStreamStateReader *pStreamStateReader; // rsma int8_t rsmaDone; - TSnapRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; + TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; SRSmaSnapReader *pRsmaReader; }; -static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TSnapRangeArray **ppRanges) { - int32_t code = -1; - STsdbSnapPartList *pList = tsdbSnapPartListCreate(); - if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _out; - } - if (tDeserializeTsdbSnapPartList(buf, bufLen, pList) < 0) { - terrno = TSDB_CODE_INVALID_DATA_FMT; - goto _out; - } - if (tsdbSnapPartListToRangeDiff(pList, ppRanges) < 0) { - goto _out; - } - code = 0; -_out: - tsdbSnapPartListDestroy(&pList); - return code; -} - -static TSnapRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) { +static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) { ASSERTS(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2, "Unexpected array size"); switch (tsdbTyp) { case SNAP_DATA_TSDB: @@ -83,37 +87,66 @@ static TSnapRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int } } -static int32_t vnodeSnapReaderDoSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) { +static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) { SVnode *pVnode = pReader->pVnode; int32_t code = -1; if (pParam->data) { + // decode SSyncTLV *datHead = (void *)pParam->data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { terrno = TSDB_CODE_INVALID_DATA_FMT; goto _out; } - TSnapRangeArray **ppRanges = NULL; - int32_t offset = 0; + STsdbRepOpts tsdbOpts = {0}; + TFileSetRangeArray **ppRanges = NULL; + int32_t offset = 0; while (offset + sizeof(SSyncTLV) < datHead->len) { SSyncTLV *subField = (void *)(datHead->val + offset); offset += sizeof(SSyncTLV) + subField->len; void *buf = subField->val; int32_t bufLen = subField->len; - ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ); - if (ppRanges == NULL) { - vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); - goto _out; - } - if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { - vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); - goto _out; + + switch (subField->typ) { + case SNAP_DATA_TSDB: + case SNAP_DATA_RSMA1: + case SNAP_DATA_RSMA2: { + ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ); + if (ppRanges == NULL) { + vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); + goto _out; + } + if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { + vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; + case SNAP_DATA_RAW: { + if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { + vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; + default: + vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); + goto _out; } } - } + // toggle snap replication mode + vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); + if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { + pReader->tsdbDone = true; + } else { + pReader->tsdbRAWDone = true; + } + + ASSERT(pReader->tsdbDone != pReader->tsdbRAWDone); + vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode), + (pReader->tsdbDone ? "raw" : "normal")); + } code = 0; _out: return code; @@ -135,7 +168,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader pReader->ever = ever; // snapshot info - if (vnodeSnapReaderDoSnapInfo(pReader, pParam) < 0) { + if (vnodeSnapReaderDealWithSnapInfo(pReader, pParam) < 0) { goto _err; } @@ -152,9 +185,9 @@ _err: static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) { int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2}; for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) { - TSnapRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]); + TFileSetRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]); if (ppRanges == NULL) continue; - tsdbSnapRangeArrayDestroy(ppRanges); + tsdbTFileSetRangeArrayDestroy(ppRanges); } } @@ -170,6 +203,10 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { tsdbSnapReaderClose(&pReader->pTsdbReader); } + if (pReader->pTsdbRAWReader) { + tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader); + } + if (pReader->pMetaReader) { metaSnapReaderClose(&pReader->pMetaReader); } @@ -285,6 +322,28 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } } + if (!pReader->tsdbRAWDone) { + // open if not + if (pReader->pTsdbRAWReader == NULL) { + ASSERT(pReader->sver == 0); + code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); + if (code) goto _err; + } + + code = tsdbSnapRAWRead(pReader->pTsdbRAWReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->tsdbRAWDone = 1; + code = tsdbSnapRAWReaderClose(&pReader->pTsdbRAWReader); + if (code) goto _err; + } + } + } + // TQ ================ vInfo("vgId:%d tq transform start", vgId); if (!pReader->tqHandleDone) { @@ -455,8 +514,10 @@ struct SVSnapWriter { // meta SMetaSnapWriter *pMetaSnapWriter; // tsdb - TSnapRangeArray *pRanges; + TFileSetRangeArray *pRanges; STsdbSnapWriter *pTsdbSnapWriter; + // tsdb raw + STsdbSnapRAWWriter *pTsdbSnapRAWWriter; // tq STqSnapWriter *pTqSnapWriter; STqOffsetWriter *pTqOffsetWriter; @@ -465,11 +526,11 @@ struct SVSnapWriter { SStreamTaskWriter *pStreamTaskWriter; SStreamStateWriter *pStreamStateWriter; // rsma - TSnapRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; + TFileSetRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; SRSmaSnapWriter *pRsmaSnapWriter; }; -TSnapRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) { +TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) { ASSERTS(sizeof(pWriter->pRsmaRanges) / sizeof(pWriter->pRsmaRanges[0]) == 2, "Unexpected array size"); switch (tsdbTyp) { case SNAP_DATA_TSDB: @@ -483,7 +544,7 @@ TSnapRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t ts } } -static int32_t vnodeSnapWriterDoSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) { +static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) { SVnode *pVnode = pWriter->pVnode; int32_t code = -1; @@ -494,7 +555,8 @@ static int32_t vnodeSnapWriterDoSnapInfo(SVSnapWriter *pWriter, SSnapshotParam * goto _out; } - TSnapRangeArray **ppRanges = NULL; + STsdbRepOpts tsdbOpts = {0}; + TFileSetRangeArray **ppRanges = NULL; int32_t offset = 0; while (offset + sizeof(SSyncTLV) < datHead->len) { @@ -502,16 +564,34 @@ static int32_t vnodeSnapWriterDoSnapInfo(SVSnapWriter *pWriter, SSnapshotParam * offset += sizeof(SSyncTLV) + subField->len; void *buf = subField->val; int32_t bufLen = subField->len; - ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ); - if (ppRanges == NULL) { - vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); - goto _out; - } - if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { - vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); - goto _out; + + switch (subField->typ) { + case SNAP_DATA_TSDB: + case SNAP_DATA_RSMA1: + case SNAP_DATA_RSMA2: { + ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ); + if (ppRanges == NULL) { + vError("vgId:%d, unexpected subfield type in snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); + goto _out; + } + if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { + vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; + case SNAP_DATA_RAW: { + if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { + vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; + default: + vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); + goto _out; } } + + vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); } code = 0; @@ -558,7 +638,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter pWriter->commitID = ++pVnode->state.commitID; // snapshot info - if (vnodeSnapWriterDoSnapInfo(pWriter, pParam) < 0) { + if (vnodeSnapWriterDealWithSnapInfo(pWriter, pParam) < 0) { goto _err; } @@ -576,9 +656,9 @@ _err: static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) { int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2}; for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) { - TSnapRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]); + TFileSetRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]); if (ppRanges == NULL) continue; - tsdbSnapRangeArrayDestroy(ppRanges); + tsdbTFileSetRangeArrayDestroy(ppRanges); } } @@ -593,6 +673,10 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter); } + if (pWriter->pTsdbSnapRAWWriter) { + tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter); + } + if (pWriter->pRsmaSnapWriter) { rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter); } @@ -629,6 +713,11 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } + if (pWriter->pTsdbSnapRAWWriter) { + code = tsdbSnapRAWWriterClose(&pWriter->pTsdbSnapRAWWriter, rollback); + if (code) goto _exit; + } + if (pWriter->pTqSnapWriter) { code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback); if (code) goto _exit; @@ -752,6 +841,17 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr); if (code) goto _err; } break; + case SNAP_DATA_RAW: { + // tsdb + if (pWriter->pTsdbSnapRAWWriter == NULL) { + ASSERT(pWriter->sver == 0); + code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter); + if (code) goto _err; + } + + code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr); + if (code) goto _err; + } break; case SNAP_DATA_TQ_HANDLE: { // tq handle if (pWriter->pTqSnapWriter == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 817d5124a2..5871a60c9e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -804,7 +804,7 @@ int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) { } if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - code = tsdbSnapGetDetails(pVnode, pSnap); + code = tsdbSnapPrepDescription(pVnode, pSnap); } return code; } diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 04456b2454..ecd2b5163e 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -56,6 +56,10 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode); int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); +int32_t syncSnapSendMsg(SSyncSnapshotSender* pSender, int32_t seq, void* pBlock, int32_t len, int32_t typ); +int32_t syncSnapSendRsp(SSyncSnapshotReceiver* pReceiver, SyncSnapshotSend* pMsg, void* pBlock, int32_t len, + int32_t typ, int32_t code); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 95952c960e..93e81fd8e2 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -23,8 +23,6 @@ #include "syncReplication.h" #include "syncUtil.h" -int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ); - static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { taosThreadMutexLock(&pBuf->mutex); for (int64_t i = pBuf->start; i < pBuf->end; ++i) { @@ -123,6 +121,11 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { if (pSender->pSndBuf) { syncSnapBufferDestroy(&pSender->pSndBuf); } + + if (pSender->snapshotParam.data) { + taosMemoryFree(pSender->snapshotParam.data); + pSender->snapshotParam.data = NULL; + } // free sender taosMemoryFree(pSender); } @@ -153,7 +156,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->lastSendTime = taosGetTimestampMs(); pSender->finish = false; - // Get full snapshot info + // Get snapshot info SSyncNode *pSyncNode = pSender->pSyncNode; SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT}; if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) { @@ -161,11 +164,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { goto _out; } - int dataLen = 0; void *pData = snapInfo.data; - int32_t type = 0; + int32_t type = (pData) ? snapInfo.type : 0; + int32_t dataLen = 0; if (pData) { - type = snapInfo.type; SSyncTLV *datHead = pData; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) { sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); @@ -347,9 +349,6 @@ _out:; return code; } -// return 0, start ok -// return 1, last snapshot finish ok -// return -1, error int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { @@ -380,6 +379,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } +// receiver SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && (pSyncNode->pFsm->FpSnapshotDoWrite != NULL); @@ -509,8 +509,6 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId)); } -// just set start = false -// FpSnapshotStopWrite should not be called void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); @@ -531,7 +529,6 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { syncSnapBufferReset(pReceiver->pRcvBuf); } -// when recv last snapshot block, apply data into snapshot static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { int32_t code = 0; if (pReceiver->pWriter != NULL) { @@ -590,8 +587,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap return 0; } -// apply data block -// update progress static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { if (pMsg->seq != pReceiver->ack + 1) { sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq); @@ -644,6 +639,50 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { return snapStart; } +static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver, + SyncSnapshotSend *pMsg, SSnapshot *pInfo) { + ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT); + int32_t code = 0; + + // copy snap info from leader + void *data = taosMemoryCalloc(1, pMsg->dataLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; + goto _out; + } + pInfo->data = data; + data = NULL; + memcpy(pInfo->data, pMsg->data, pMsg->dataLen); + + // exchange snap info + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo); + SSyncTLV *datHead = pInfo->data; + if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); + code = TSDB_CODE_INVALID_DATA_FMT; + goto _out; + } + int32_t dataLen = sizeof(SSyncTLV) + datHead->len; + + // save exchanged snap info + SSnapshotParam *pParam = &pReceiver->snapshotParam; + data = taosMemoryRealloc(pParam->data, dataLen); + if (data == NULL) { + sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId, + strerror(errno), dataLen); + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; + goto _out; + } + pParam->data = data; + data = NULL; + memcpy(pParam->data, pInfo->data, dataLen); + +_out: + return code; +} + static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; int64_t timeNow = taosGetTimestampMs(); @@ -686,77 +725,27 @@ _START_RECEIVER: snapshotReceiverStop(pReceiver); } - snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender + snapshotReceiverStart(pReceiver, pMsg); -_SEND_REPLY: - // build msg - ; // make complier happy +_SEND_REPLY:; SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY}; int32_t dataLen = 0; - if (pMsg->dataLen > 0) { - void *data = taosMemoryCalloc(1, pMsg->dataLen); - if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; + if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) { + if (syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo) != 0) { goto _out; } - memcpy(data, pMsg->data, pMsg->dataLen); - snapInfo.data = data; - data = NULL; - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo); - SSyncTLV *datHead = snapInfo.data; - if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); - code = TSDB_CODE_INVALID_DATA_FMT; - goto _out; - } dataLen = sizeof(SSyncTLV) + datHead->len; } - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, dataLen, pSyncNode->vgId) != 0) { - sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); + // send response + int32_t type = (snapInfo.data) ? snapInfo.type : 0; + if (syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code) != 0) { code = terrno; goto _out; } - SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = raftStoreGetTerm(pSyncNode); - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->startTime = pMsg->startTime; - pRspMsg->ack = pMsg->seq; // receiver maybe already closed - pRspMsg->code = code; - pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); - - if (snapInfo.data) { - pRspMsg->payloadType = snapInfo.type; - memcpy(pRspMsg->data, snapInfo.data, dataLen); - - // save snapshot info - SSnapshotParam *pParam = &pReceiver->snapshotParam; - void *data = taosMemoryRealloc(pParam->data, dataLen); - if (data == NULL) { - sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId, - strerror(errno), dataLen); - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; - goto _out; - } - pParam->data = data; - memcpy(pParam->data, snapInfo.data, dataLen); - } - - // send msg - if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { - sRError(pReceiver, "failed to send resp since %s", terrstr()); - code = terrno; - } - _out: if (snapInfo.data) { taosMemoryFree(snapInfo.data); @@ -793,38 +782,20 @@ _SEND_REPLY: code = terrno; } - // build msg - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) { - sRError(pReceiver, "failed to build snapshot receiver resp since %s", terrstr()); - return -1; - } - - SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = raftStoreGetTerm(pSyncNode); - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->startTime = pMsg->startTime; - pRspMsg->ack = pReceiver->ack; // receiver maybe already closed - pRspMsg->code = code; - pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; - - // send msg - if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { - sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr()); + // send response + if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) { return -1; } return code; } -static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, int32_t code) { +int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen, + int32_t type, int32_t code) { SSyncNode *pSyncNode = pReceiver->pSyncNode; // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) { + if (syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) { sRError(pReceiver, "failed to build snapshot receiver resp since %s", terrstr()); return -1; } @@ -832,13 +803,18 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = raftStoreGetTerm(pSyncNode); + pRspMsg->term = pMsg->term; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pMsg->startTime; pRspMsg->ack = pMsg->seq; pRspMsg->code = code; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; + pRspMsg->payloadType = type; + + if (pBlock != NULL && blockLen > 0) { + memcpy(pRspMsg->data, pBlock, blockLen); + } // send msg if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { @@ -872,7 +848,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot ppMsg[0] = NULL; pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); } else if (pMsg->seq < pRcvBuf->start) { - syncSnapSendRsp(pReceiver, pMsg, code); + syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code); goto _out; } @@ -892,7 +868,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot } } pRcvBuf->start = seq + 1; - syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code); + syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code); pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]); pRcvBuf->entries[seq % pRcvBuf->size] = NULL; if (code) goto _out; @@ -915,7 +891,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr()); - return syncSnapSendRsp(pReceiver, pMsg, terrno); + return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, terrno); } return syncSnapBufferRecv(pReceiver, ppMsg); @@ -971,26 +947,6 @@ _SEND_REPLY:; return code; } -// receiver on message -// -// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP -// if receiver already start -// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) -// if sender.start-time = receiver.start-time, maybe duplicate msg -// if sender.start-time < receiver.start-time, ignore -// else -// waiting for clock match -// start receiver(reply snapshot start) -// -// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN -// a. create writer with -// -// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig) -// -// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close -// -// condition 5, got data, update ack -// int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; SyncSnapshotSend *pMsg = ppMsg[0]; @@ -1074,6 +1030,32 @@ _out:; return code; } +static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { + ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY); + + SSyncTLV *datHead = (void *)pMsg->data; + if (datHead->typ != pMsg->payloadType) { + sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ); + terrno = TSDB_CODE_INVALID_DATA_FMT; + return -1; + } + int32_t dataLen = sizeof(SSyncTLV) + datHead->len; + + SSnapshotParam *pParam = &pSender->snapshotParam; + void *data = taosMemoryRealloc(pParam->data, dataLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(data, pMsg->data, dataLen); + + pParam->data = data; + data = NULL; + sSInfo(pSender, "data of snapshot param. len: %d", datHead->len); + return 0; +} + +// sender static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { SSnapshot snapshot = {0}; pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -1090,14 +1072,9 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // start reader if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - SSyncTLV *datHead = (void *)pMsg->data; - if (datHead->typ != pMsg->payloadType) { - sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ); - terrno = TSDB_CODE_INVALID_DATA_FMT; + if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) { return -1; } - pSender->snapshotParam.data = (void *)pMsg->data; - sSInfo(pSender, "data of snapshot param. len: %d", datHead->len); } int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); @@ -1131,7 +1108,7 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp goto _out; } - if (pSender->pReader == NULL || pSender->finish) { + if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) { code = terrno = TSDB_CODE_SYN_INTERNAL_ERROR; goto _out; } @@ -1182,12 +1159,6 @@ _out: return code; } -// sender on message -// -// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender -// condition 2 sender receives ack, set seq = ack + 1, send msg from seq -// condition 3 sender receives error msg, just print error log -// int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont; SyncSnapshotRsp *pMsg = ppMsg[0]; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 06847c081c..2ce56af946 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -487,3 +487,13 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl sNInfo(pSyncNode, "send sync-request-vote-reply to dnode:%d {term:%" PRId64 ", grant:%d}, %s", DID(&pMsg->destId), pMsg->term, pMsg->voteGranted, s); } + +int32_t syncSnapInfoDataRealloc(SSnapshot* pSnap, int32_t size) { + void* data = taosMemoryRealloc(pSnap->data, size); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pSnap->data = data; + return 0; +}