From 6c419423de9c2b7f3319da17e8d70994e851fbd2 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 20 Nov 2023 20:19:46 +0800 Subject: [PATCH] feat: impl tsdb snapshot reader and writer for raw files --- source/dnode/vnode/src/inc/vnodeInt.h | 11 + source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c | 212 +++++++ source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h | 127 ++++ source/dnode/vnode/src/tsdb/tsdbFSetRAW.c | 173 ++++++ source/dnode/vnode/src/tsdb/tsdbFSetRAW.h | 45 ++ source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c | 586 ++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeSnapshot.c | 17 + 7 files changed, 1171 insertions(+) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2c051ea642..50a28357e5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -65,6 +65,8 @@ typedef struct SMetaSnapReader SMetaSnapReader; typedef struct SMetaSnapWriter SMetaSnapWriter; typedef struct STsdbSnapReader STsdbSnapReader; typedef struct STsdbSnapWriter STsdbSnapWriter; +typedef struct STsdbSnapRAWReader STsdbSnapRAWReader; +typedef struct STsdbSnapRAWWriter STsdbSnapRAWWriter; typedef struct STqSnapReader STqSnapReader; typedef struct STqSnapWriter STqSnapWriter; typedef struct STqOffsetReader STqOffsetReader; @@ -313,6 +315,15 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr); int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); +// STsdbSnapRAWReader ======================================== +int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader); +int32_t tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader); +int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData); +// STsdbSnapRAWWriter ======================================== +int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter); +int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* pWriter, SSnapDataHdr* pHdr); +int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* pWriter); +int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** ppWriter, int8_t rollback); // STqSnapshotReader == int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader); int32_t tqSnapReaderClose(STqSnapReader** ppReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c index e69de29bb2..0c3d890966 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbDataFileRAW.h" + +// SDataFileRAWReader ============================================= +int32_t tsdbDataFileRAWReaderOpen(const char *fname, const SDataFileRAWReaderConfig *config, + SDataFileRAWReader **reader) { + int32_t code = 0; + int32_t lino = 0; + + reader[0] = taosMemoryCalloc(1, sizeof(SDataFileRAWReader)); + if (reader[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + reader[0]->config[0] = config[0]; + + if (fname) { + if (fname) { + code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + char fname1[TSDB_FILENAME_LEN]; + tsdbTFileName(config->tsdb, &config->file, fname1); + code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileRAWReaderClose(SDataFileRAWReader **reader) { + if (reader[0] == NULL) return 0; + + if (reader[0]->fd) { + tsdbCloseFile(&reader[0]->fd); + } + + taosMemoryFree(reader[0]); + reader[0] = NULL; + return 0; +} + +int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *bHdr) { + int32_t code = 0; + int32_t lino = 0; + + bHdr->file.type = reader->config->file.type; + bHdr->file.fid = reader->config->file.fid; + bHdr->file.cid = reader->config->file.cid; + bHdr->file.size = reader->config->file.size; + bHdr->file.minVer = reader->config->file.minVer; + bHdr->file.maxVer = reader->config->file.maxVer; + bHdr->file.stt->level = reader->config->file.stt->level; + + int64_t size = TMIN(bHdr->dataLength, reader->config->file.size - reader->ctx->offset); + ASSERT(size > 0); + bHdr->dataLength = 0; + bHdr->offset = reader->ctx->offset; + + code = tsdbReadFile(reader->fd, bHdr->offset, bHdr->data, size); + TSDB_CHECK_CODE(code, lino, _exit); + + bHdr->dataLength = size; +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); + } + return code; +} + +// SDataFileRAWWriter ============================================= +int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **writer) { + writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); + if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; + + writer[0]->config[0] = config[0]; + return 0; +} + +static int32_t tsdbDataFileRAWWriterCloseAbort(SDataFileRAWWriter *writer) { + ASSERT(0); + return 0; +} + +static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return 0; } + +int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + writer->file = writer->config->file; + writer->ctx->offset = 0; + + writer->ctx->opened = true; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) { + int32_t code = 0; + int32_t lino = 0; + STFileOp op; + + op = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = writer->config->fid, + .nf = writer->file, + }; + code = TARRAY2_APPEND(opArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + if (writer->fd) { + code = tsdbFsyncFile(writer->fd); + TSDB_CHECK_CODE(code, lino, _exit); + tsdbCloseFile(&writer->fd); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDataFileRAWWriterOpenDataFD(SDataFileRAWWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + char fname[TSDB_FILENAME_LEN]; + int32_t flag = TD_FILE_READ | TD_FILE_WRITE; + + if (writer->file.size == 0) { + flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); + } + + tsdbTFileName(writer->config->tsdb, &writer->file, fname); + code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr) { + if (writer[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + if (writer[0]->ctx->opened) { + if (abort) { + code = tsdbDataFileRAWWriterCloseAbort(writer[0]); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileRAWWriterCloseCommit(writer[0], opArr); + TSDB_CHECK_CODE(code, lino, _exit); + } + tsdbDataFileRAWWriterDoClose(writer[0]); + } + taosMemoryFree(writer[0]); + writer[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *pDataBlock) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbWriteFile(writer->fd, writer->ctx->offset, (const uint8_t *)pDataBlock->data, pDataBlock->dataLength); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->file.size += pDataBlock->dataLength; + writer->ctx->offset += pDataBlock->dataLength; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h index e69de29bb2..49f80b0be5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.h @@ -0,0 +1,127 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tarray2.h" +#include "tsdbDef.h" +#include "tsdbFSet2.h" +#include "tsdbFile2.h" +#include "tsdbUtil2.h" + +#ifndef _TSDB_DATA_FILE_RAW_H +#define _TSDB_DATA_FILE_RAW_H + +#ifdef __cplusplus +extern "C" { +#endif + +#define TSDB_SNAP_RAW_PAYLOAD_SIZE (4096 * 1024) +#if 0 +struct SDataRAWBlock { + int8_t *data; + int64_t size; +}; + +int32_t tsdbDataRAWBlockReset(SDataRAWBlock *pBlock); +int32_t tsdbDataRAWBlockAlloc(SDataRawBlock *pBlock); +void tsdbDataRAWBlockFree(SDataRAWBlock *pBlock); +#endif + +// STsdbDataRAWBlockHeader ======================================= +typedef struct STsdbDataRAWBlockHeader { + struct { + int32_t type; + int64_t fid; + int64_t cid; + int64_t size; + int64_t minVer; + int64_t maxVer; + union { + struct { + int32_t level; + } stt[1]; + }; + } file; + + int64_t offset; + int64_t dataLength; + uint8_t data[0]; +} STsdbDataRAWBlockHeader; + +// SDataFileRAWReader ============================================= +typedef struct SDataFileRAWReaderConfig { + STsdb *tsdb; + int32_t szPage; + + STFile file; +} SDataFileRAWReaderConfig; + +typedef struct SDataFileRAWReader { + SDataFileRAWReaderConfig config[1]; + + struct { + bool opened; + int64_t offset; + } ctx[1]; + + STsdbFD *fd; +} SDataFileRAWReader; + +typedef TARRAY2(SDataFileRAWReader *) SDataFileRAWReaderArray; + +int32_t tsdbDataFileRAWReaderOpen(const char *fname, const SDataFileRAWReaderConfig *config, + SDataFileRAWReader **reader); +int32_t tsdbDataFileRAWReaderClose(SDataFileRAWReader **reader); + +int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *bHdr); + +// SDataFileRAWWriter ============================================= +typedef struct SDataFileRAWWriterConfig { + STsdb *tsdb; + int32_t szPage; + + SDiskID did; + int64_t fid; + int64_t cid; + int32_t level; + + STFile file; +} SDataFileRAWWriterConfig; + +typedef struct SDataFileRAWWriter { + SDataFileRAWWriterConfig config[1]; + + struct { + bool opened; + int64_t offset; + } ctx[1]; + + STFile file; + STsdbFD *fd; +} SDataFileRAWWriter; + +typedef struct SDataFileRAWWriter SDataFileRAWWriter; + +int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **writer); +int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr); + +int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer); +int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *bHdr); +int32_t tsdbDataFileRAWFlush(SDataFileRAWWriter *writer); + +#ifdef __cplusplus +} +#endif + +#endif /*_TSDB_DATA_FILE_RAW_H*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRAW.c b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.c index e69de29bb2..d9cd419ef9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSetRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.c @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbFSetRAW.h" + +// SFSetRAWWriter ================================================== +typedef struct SFSetRAWWriter { + SFSetRAWWriterConfig config[1]; + + struct { + TFileOpArray fopArr[1]; + STFile file; + int64_t offset; + } ctx[1]; + + // writer + SDataFileRAWWriter *dataWriter; +} SFSetRAWWriter; + +int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **writer) { + int32_t code = 0; + int32_t lino = 0; + + writer[0] = taosMemoryCalloc(1, sizeof(SFSetRAWWriter)); + if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + writer[0]->config[0] = config[0]; + + TARRAY2_INIT(writer[0]->ctx->fopArr); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbFSetRAWWriterFinish(SFSetRAWWriter *writer, TFileOpArray *fopArr) { + int32_t code = 0; + int32_t lino = 0; + + STsdb *tsdb = writer->config->tsdb; + + STFileOp op; + TARRAY2_FOREACH(writer->ctx->fopArr, op) { + code = TARRAY2_APPEND(fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + } + + TARRAY2_CLEAR(writer->ctx->fopArr, NULL); +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbFSetRAWWriteFileDataBegin(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr) { + int32_t code = 0; + int32_t lino = 0; + + SDataFileRAWWriterConfig config = { + .tsdb = writer->config->tsdb, + .szPage = writer->config->szPage, + .did = writer->config->did, + .cid = writer->config->cid, + .level = writer->config->level, + + .file = + { + .type = bHdr->file.type, + .did = writer->config->did, + .cid = writer->config->cid, + .size = bHdr->file.size, + .minVer = bHdr->file.minVer, + .maxVer = bHdr->file.maxVer, + .stt = {{ + .level = bHdr->file.stt->level, + }}, + }, + }; + + code = tsdbDataFileRAWWriterOpen(&config, &writer->dataWriter); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbFSetRAWWriteFileDataEnd(SFSetRAWWriter *writer) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbDataFileRAWWriterClose(&writer->dataWriter, false, writer->ctx->fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbFSetRAWWriterClose(SFSetRAWWriter **writer, bool abort, TFileOpArray *fopArr) { + if (writer[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + STsdb *tsdb = writer[0]->config->tsdb; + + // end + code = tsdbFSetRAWWriteFileDataEnd(writer[0]); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbDataFileRAWWriterClose(&writer[0]->dataWriter, abort, writer[0]->ctx->fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSetRAWWriterFinish(writer[0], fopArr); + TSDB_CHECK_CODE(code, lino, _exit); + // free + TARRAY2_DESTROY(writer[0]->ctx->fopArr, NULL); + taosMemoryFree(writer[0]); + writer[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbFSetRAWWriteBlockData(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(writer->ctx->offset >= 0 && writer->ctx->offset <= writer->ctx->file.size); + + if (writer->ctx->offset == writer->ctx->file.size) { + code = tsdbFSetRAWWriteFileDataEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSetRAWWriteFileDataBegin(writer, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbDataFileRAWWriteBlockData(writer->dataWriter, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->offset += bHdr->dataLength; + ASSERT(writer->ctx->offset == writer->dataWriter->ctx->offset); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); + } + return code; +} diff --git a/source/dnode/vnode/src/tsdb/tsdbFSetRAW.h b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.h index e69de29bb2..205c785e99 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSetRAW.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSetRAW.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbDataFileRAW.h" + +#ifndef _TSDB_FSET_RAW_H +#define _TSDB_FSET_RAW_H + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SFSetRAWWriterConfig { + STsdb *tsdb; + int32_t szPage; + + SDiskID did; + int64_t fid; + int64_t cid; + int32_t level; +} SFSetRAWWriterConfig; + +typedef struct SFSetRAWWriter SFSetRAWWriter; + +int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **writer); +int32_t tsdbFSetRAWWriterClose(SFSetRAWWriter **writer, bool abort, TFileOpArray *fopArr); +int32_t tsdbFSetRAWWriteBlockData(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr); + +#ifdef __cplusplus +} +#endif + +#endif /*_TSDB_FSET_RAW_H*/ diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c index e69de29bb2..2bcd00bfec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c @@ -0,0 +1,586 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdb.h" +#include "tsdbDataFileRAW.h" +#include "tsdbFS2.h" +#include "tsdbFSetRAW.h" + +// reader + +typedef struct SDataFileRAWReaderIter { + int32_t count; + int32_t idx; + int64_t offset; + int64_t size; +} SDataFileRAWReaderIter; + +typedef struct STsdbSnapRAWReader { + STsdb* tsdb; + int64_t ever; + int8_t type; + + TFileSetArray* fsetArr; + + // context + struct { + int32_t fsetArrIdx; + STFileSet* fset; + bool isDataDone; + } ctx[1]; + + // reader + SDataFileRAWReaderArray dataReaderArr[1]; + + // iter + SDataFileRAWReaderIter dataIter[1]; +} STsdbSnapRAWReader; + +int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** reader) { + int32_t code = 0; + int32_t lino = 0; + + reader[0] = taosMemoryCalloc(1, sizeof(STsdbSnapRAWReader)); + if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + reader[0]->tsdb = tsdb; + reader[0]->ever = ever; + reader[0]->type = type; + + code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), + __func__, lino, tstrerror(code), 0, ever, type); + tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + taosMemoryFree(reader[0]); + reader[0] = NULL; + } else { + tsdbInfo("vgId:%d tsdb snapshot reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), 0, + ever, type); + } + return code; +} + +int32_t tsdbSnapRAWReaderClose(STsdbSnapRAWReader** reader) { + if (reader[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + STsdb* tsdb = reader[0]->tsdb; + + tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr); + taosMemoryFree(reader[0]); + reader[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + // data + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { + if (reader->ctx->fset->farr[ftype] == NULL) { + continue; + } + STFileObj* fobj = reader->ctx->fset->farr[ftype]; + SDataFileRAWReader* dataReader; + SDataFileRAWReaderConfig config = { + .tsdb = reader->tsdb, + .szPage = reader->tsdb->pVnode->config.tsdbPageSize, + .file = fobj->f[0], + }; + code = tsdbDataFileRAWReaderOpen(NULL, &config, &dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // stt + SSttLvl* lvl; + TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) { + STFileObj* fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + SDataFileRAWReader* dataReader; + SDataFileRAWReaderConfig config = { + .tsdb = reader->tsdb, + .szPage = reader->tsdb->pVnode->config.tsdbPageSize, + .file = fobj->f[0], + }; + code = tsdbDataFileRAWReaderOpen(NULL, &config, &dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = TARRAY2_APPEND(reader->dataReaderArr, dataReader); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetCloseReader(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + TARRAY2_CLEAR(reader->dataReaderArr, tsdbDataFileRAWReaderClose); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetOpenIter(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + reader->dataIter->count = TARRAY2_SIZE(reader->dataReaderArr); + reader->dataIter->idx = -1; + reader->dataIter->offset = 0; + reader->dataIter->size = 0; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) { + reader->dataIter->count = 0; + reader->dataIter->idx = 0; + reader->dataIter->offset = 0; + reader->dataIter->size = 0; + return 0; +} + +static int32_t tsdbSnapRAWReadNext(STsdbSnapRAWReader* reader, STsdbDataRAWBlockHeader* bHdr) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(reader->dataIter->offset <= reader->dataIter->size); + ASSERT(reader->dataIter->idx <= reader->dataIter->count); + + if (reader->dataIter->offset == reader->dataIter->size && reader->dataIter->idx < reader->dataIter->count) { + reader->dataIter->idx++; + } + if (reader->dataIter->idx == reader->dataIter->count) { + return 0; + } + + SDataFileRAWReader* dataReader = TARRAY2_GET(reader->dataReaderArr, reader->dataIter->idx); + code = tsdbDataFileRAWReadBlockData(dataReader, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + + reader->dataIter->offset += bHdr->dataLength; +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadData(STsdbSnapRAWReader* reader, SSnapDataHdr** data) { + int32_t code = 0; + int32_t lino = 0; + + void* pBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + sizeof(STsdbDataRAWBlockHeader) + TSDB_SNAP_RAW_PAYLOAD_SIZE); + if (pBuf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + SSnapDataHdr* pHdr = pBuf; + pHdr->type = reader->type; + STsdbDataRAWBlockHeader* pData = (void*)pHdr->data; + pData->dataLength = TSDB_SNAP_RAW_PAYLOAD_SIZE; + + code = tsdbSnapRAWReadNext(reader, pData); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(pData->dataLength > 0 && pData->dataLength <= TSDB_SNAP_RAW_PAYLOAD_SIZE); + pHdr->size = sizeof(STsdbDataRAWBlockHeader) + pData->dataLength; + +_exit: + if (code) { + taosMemoryFree(pBuf); + pBuf = NULL; + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + data[0] = pBuf; + return code; +} + +static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(reader->ctx->fset == NULL); + + if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { + reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); + reader->ctx->isDataDone = false; + + code = tsdbSnapRAWReadFileSetOpenReader(reader); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapRAWReadFileSetOpenIter(reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } + return code; +} + +static int32_t tsdbSnapRAWReadEnd(STsdbSnapRAWReader* reader) { + tsdbSnapRAWReadFileSetCloseIter(reader); + tsdbSnapRAWReadFileSetCloseReader(reader); + reader->ctx->fset = NULL; + return 0; +} + +int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* reader, uint8_t** data) { + int32_t code = 0; + int32_t lino = 0; + + data[0] = NULL; + + for (;;) { + if (reader->ctx->fset == NULL) { + code = tsdbSnapRAWReadBegin(reader); + TSDB_CHECK_CODE(code, lino, _exit); + + if (reader->ctx->fset == NULL) { + break; + } + } + + if (!reader->ctx->isDataDone) { + code = tsdbSnapRAWReadData(reader, (SSnapDataHdr**)data); + TSDB_CHECK_CODE(code, lino, _exit); + if (data[0]) { + goto _exit; + } else { + reader->ctx->isDataDone = true; + } + } + + code = tsdbSnapRAWReadEnd(reader); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__); + } + return code; +} + +// writer +struct STsdbSnapRAWWriter { + STsdb* tsdb; + int64_t sver; + int64_t ever; + int32_t minutes; + int8_t precision; + int32_t minRow; + int32_t maxRow; + int8_t cmprAlg; + int64_t commitID; + int32_t szPage; + int64_t compactVersion; + int64_t now; + + TFileSetArray* fsetArr; + TFileOpArray fopArr[1]; + + struct { + bool fsetWriteBegin; + int32_t fid; + STFileSet* fset; + SDiskID did; + int64_t cid; + int64_t level; + + // writer + SFSetRAWWriter* fsetWriter; + } ctx[1]; +}; + +int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** writer) { + int32_t code = 0; + int32_t lino = 0; + + // disable background tasks + tsdbFSDisableBgTask(pTsdb->pFS); + + // start to write + writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); + if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + writer[0]->tsdb = pTsdb; + writer[0]->ever = ever; + writer[0]->minutes = pTsdb->keepCfg.days; + writer[0]->precision = pTsdb->keepCfg.precision; + writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; + writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; + writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS); + writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize; + writer[0]->compactVersion = INT64_MAX; + writer[0]->now = taosGetTimestampMs(); + + code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } else { + tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, 0, ever); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetOpenIter(STsdbSnapRAWWriter* writer) { + int32_t code = 0; + int32_t lino = 0; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetCloseIter(STsdbSnapRAWWriter* writer) { return 0; } + +static int32_t tsdbSnapRAWWriteFileSetOpenWriter(STsdbSnapRAWWriter* writer) { + int32_t code = 0; + int32_t lino = 0; + + SFSetRAWWriterConfig config = { + .tsdb = writer->tsdb, + .szPage = writer->szPage, + .fid = writer->ctx->fid, + .cid = writer->commitID, + .did = writer->ctx->did, + .level = writer->ctx->level, + }; + + code = tsdbFSetRAWWriterOpen(&config, &writer->ctx->fsetWriter); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetCloseWriter(STsdbSnapRAWWriter* writer) { + return tsdbFSetRAWWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr); +} + +static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t fid) { + int32_t code = 0; + int32_t lino = 0; + + ASSERT(writer->ctx->fsetWriteBegin == false); + + STFileSet* fset = &(STFileSet){.fid = fid}; + + writer->ctx->fid = fid; + STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ); + writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr; + + int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec()); + if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) { + code = TSDB_CODE_NO_AVAIL_DISK; + TSDB_CHECK_CODE(code, lino, _exit); + } + tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did); + + code = tsdbSnapRAWWriteFileSetOpenWriter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->level = level; + writer->ctx->fsetWriteBegin = true; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteFileSetEnd(STsdbSnapRAWWriter* writer) { + if (!writer->ctx->fsetWriteBegin) return 0; + + int32_t code = 0; + int32_t lino = 0; + + // close write + code = tsdbSnapRAWWriteFileSetCloseWriter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->fsetWriteBegin = false; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* writer) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbSnapRAWWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } else { + tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__); + } + return code; +} + +int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** writer, int8_t rollback) { + if (writer[0] == NULL) return 0; + + int32_t code = 0; + int32_t lino = 0; + + STsdb* tsdb = writer[0]->tsdb; + + if (rollback) { + code = tsdbFSEditAbort(writer[0]->tsdb->pFS); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + taosThreadMutexLock(&writer[0]->tsdb->mutex); + + code = tsdbFSEditCommit(writer[0]->tsdb->pFS); + if (code) { + taosThreadMutexUnlock(&writer[0]->tsdb->mutex); + TSDB_CHECK_CODE(code, lino, _exit); + } + + writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL; + + taosThreadMutexUnlock(&writer[0]->tsdb->mutex); + } + tsdbFSEnableBgTask(tsdb->pFS); + + TARRAY2_DESTROY(writer[0]->fopArr, NULL); + tsdbFSDestroyCopySnapshot(&writer[0]->fsetArr); + + taosMemoryFree(writer[0]); + writer[0] = NULL; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); + } else { + tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__); + } + return code; +} + +static int32_t tsdbSnapRAWWriteTimeSeriesData(STsdbSnapRAWWriter* writer, STsdbDataRAWBlockHeader* bHdr) { + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFSetRAWWriteBlockData(writer->ctx->fsetWriter, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbSnapRAWWriteData(STsdbSnapRAWWriter* writer, SSnapDataHdr* hdr) { + int32_t code = 0; + int32_t lino = 0; + + STsdbDataRAWBlockHeader* bHdr = (void*)hdr->data; + int32_t fid = bHdr->file.fid; + if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) { + code = tsdbSnapRAWWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapRAWWriteFileSetBegin(writer, fid); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbSnapRAWWriteTimeSeriesData(writer, bHdr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + +int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* writer, SSnapDataHdr* hdr) { + ASSERT(hdr->type == SNAP_DATA_RAW); + + int32_t code = 0; + int32_t lino = 0; + + code = tsdbSnapRAWWriteData(writer, hdr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64, + TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size); + } else { + tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__, + hdr->type, hdr->index, hdr->size); + } + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 21c858709b..1a6b1a7a6c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -51,6 +51,10 @@ struct SVSnapReader { int8_t tsdbDone; TFileSetRangeArray *pRanges; STsdbSnapReader *pTsdbReader; + // tsdb raw + int8_t tsdbRawDone; + STsdbSnapRAWReader *pTsdbRawReader; + // tq int8_t tqHandleDone; STqSnapReader *pTqSnapReader; @@ -467,6 +471,8 @@ struct SVSnapWriter { // tsdb TFileSetRangeArray *pRanges; STsdbSnapWriter *pTsdbSnapWriter; + // tsdb raw + STsdbSnapRAWWriter *pTsdbSnapRAWWriter; // tq STqSnapWriter *pTqSnapWriter; STqOffsetWriter *pTqOffsetWriter; @@ -772,6 +778,17 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr); if (code) goto _err; } break; + case SNAP_DATA_RAW: { + // tsdb + if (pWriter->pTsdbSnapRAWWriter == NULL) { + ASSERT(pWriter->sver == 0); + code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter); + if (code) goto _err; + } + + code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr); + if (code) goto _err; + } break; case SNAP_DATA_TQ_HANDLE: { // tq handle if (pWriter->pTqSnapWriter == NULL) {