feat: impl tsdb snapshot reader and writer for raw files
This commit is contained in:
parent
a4c504169d
commit
6c419423de
|
@ -65,6 +65,8 @@ typedef struct SMetaSnapReader SMetaSnapReader;
|
|||
typedef struct SMetaSnapWriter SMetaSnapWriter;
|
||||
typedef struct STsdbSnapReader STsdbSnapReader;
|
||||
typedef struct STsdbSnapWriter STsdbSnapWriter;
|
||||
typedef struct STsdbSnapRAWReader STsdbSnapRAWReader;
|
||||
typedef struct STsdbSnapRAWWriter STsdbSnapRAWWriter;
|
||||
typedef struct STqSnapReader STqSnapReader;
|
||||
typedef struct STqSnapWriter STqSnapWriter;
|
||||
typedef struct STqOffsetReader STqOffsetReader;
|
||||
|
@ -313,6 +315,15 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRang
|
|||
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
|
||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
||||
// STsdbSnapRAWReader ========================================
|
||||
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
|
||||
int32_t tsdbSnapRAWReaderClose(STsdbSnapRAWReader** ppReader);
|
||||
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* pReader, uint8_t** ppData);
|
||||
// STsdbSnapRAWWriter ========================================
|
||||
int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** ppWriter);
|
||||
int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* pWriter, SSnapDataHdr* pHdr);
|
||||
int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* pWriter);
|
||||
int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** ppWriter, int8_t rollback);
|
||||
// STqSnapshotReader ==
|
||||
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader);
|
||||
int32_t tqSnapReaderClose(STqSnapReader** ppReader);
|
||||
|
|
|
@ -0,0 +1,212 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdbDataFileRAW.h"
|
||||
|
||||
// SDataFileRAWReader =============================================
|
||||
int32_t tsdbDataFileRAWReaderOpen(const char *fname, const SDataFileRAWReaderConfig *config,
|
||||
SDataFileRAWReader **reader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
reader[0] = taosMemoryCalloc(1, sizeof(SDataFileRAWReader));
|
||||
if (reader[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
reader[0]->config[0] = config[0];
|
||||
|
||||
if (fname) {
|
||||
if (fname) {
|
||||
code = tsdbOpenFile(fname, config->tsdb, TD_FILE_READ, &reader[0]->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
} else {
|
||||
char fname1[TSDB_FILENAME_LEN];
|
||||
tsdbTFileName(config->tsdb, &config->file, fname1);
|
||||
code = tsdbOpenFile(fname1, config->tsdb, TD_FILE_READ, &reader[0]->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDataFileRAWReaderClose(SDataFileRAWReader **reader) {
|
||||
if (reader[0] == NULL) return 0;
|
||||
|
||||
if (reader[0]->fd) {
|
||||
tsdbCloseFile(&reader[0]->fd);
|
||||
}
|
||||
|
||||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *bHdr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
bHdr->file.type = reader->config->file.type;
|
||||
bHdr->file.fid = reader->config->file.fid;
|
||||
bHdr->file.cid = reader->config->file.cid;
|
||||
bHdr->file.size = reader->config->file.size;
|
||||
bHdr->file.minVer = reader->config->file.minVer;
|
||||
bHdr->file.maxVer = reader->config->file.maxVer;
|
||||
bHdr->file.stt->level = reader->config->file.stt->level;
|
||||
|
||||
int64_t size = TMIN(bHdr->dataLength, reader->config->file.size - reader->ctx->offset);
|
||||
ASSERT(size > 0);
|
||||
bHdr->dataLength = 0;
|
||||
bHdr->offset = reader->ctx->offset;
|
||||
|
||||
code = tsdbReadFile(reader->fd, bHdr->offset, bHdr->data, size);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
bHdr->dataLength = size;
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// SDataFileRAWWriter =============================================
|
||||
int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **writer) {
|
||||
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
|
||||
if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
writer[0]->config[0] = config[0];
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileRAWWriterCloseAbort(SDataFileRAWWriter *writer) {
|
||||
ASSERT(0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return 0; }
|
||||
|
||||
int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
writer->file = writer->config->file;
|
||||
writer->ctx->offset = 0;
|
||||
|
||||
writer->ctx->opened = true;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
STFileOp op;
|
||||
|
||||
op = (STFileOp){
|
||||
.optype = TSDB_FOP_CREATE,
|
||||
.fid = writer->config->fid,
|
||||
.nf = writer->file,
|
||||
};
|
||||
code = TARRAY2_APPEND(opArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (writer->fd) {
|
||||
code = tsdbFsyncFile(writer->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
tsdbCloseFile(&writer->fd);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbDataFileRAWWriterOpenDataFD(SDataFileRAWWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
|
||||
|
||||
if (writer->file.size == 0) {
|
||||
flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
}
|
||||
|
||||
tsdbTFileName(writer->config->tsdb, &writer->file, fname);
|
||||
code = tsdbOpenFile(fname, writer->config->tsdb, flag, &writer->fd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr) {
|
||||
if (writer[0] == NULL) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
if (writer[0]->ctx->opened) {
|
||||
if (abort) {
|
||||
code = tsdbDataFileRAWWriterCloseAbort(writer[0]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbDataFileRAWWriterCloseCommit(writer[0], opArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
tsdbDataFileRAWWriterDoClose(writer[0]);
|
||||
}
|
||||
taosMemoryFree(writer[0]);
|
||||
writer[0] = NULL;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer[0]->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *pDataBlock) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbWriteFile(writer->fd, writer->ctx->offset, (const uint8_t *)pDataBlock->data, pDataBlock->dataLength);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
writer->file.size += pDataBlock->dataLength;
|
||||
writer->ctx->offset += pDataBlock->dataLength;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
|
@ -0,0 +1,127 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tarray2.h"
|
||||
#include "tsdbDef.h"
|
||||
#include "tsdbFSet2.h"
|
||||
#include "tsdbFile2.h"
|
||||
#include "tsdbUtil2.h"
|
||||
|
||||
#ifndef _TSDB_DATA_FILE_RAW_H
|
||||
#define _TSDB_DATA_FILE_RAW_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define TSDB_SNAP_RAW_PAYLOAD_SIZE (4096 * 1024)
|
||||
#if 0
|
||||
struct SDataRAWBlock {
|
||||
int8_t *data;
|
||||
int64_t size;
|
||||
};
|
||||
|
||||
int32_t tsdbDataRAWBlockReset(SDataRAWBlock *pBlock);
|
||||
int32_t tsdbDataRAWBlockAlloc(SDataRawBlock *pBlock);
|
||||
void tsdbDataRAWBlockFree(SDataRAWBlock *pBlock);
|
||||
#endif
|
||||
|
||||
// STsdbDataRAWBlockHeader =======================================
|
||||
typedef struct STsdbDataRAWBlockHeader {
|
||||
struct {
|
||||
int32_t type;
|
||||
int64_t fid;
|
||||
int64_t cid;
|
||||
int64_t size;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
union {
|
||||
struct {
|
||||
int32_t level;
|
||||
} stt[1];
|
||||
};
|
||||
} file;
|
||||
|
||||
int64_t offset;
|
||||
int64_t dataLength;
|
||||
uint8_t data[0];
|
||||
} STsdbDataRAWBlockHeader;
|
||||
|
||||
// SDataFileRAWReader =============================================
|
||||
typedef struct SDataFileRAWReaderConfig {
|
||||
STsdb *tsdb;
|
||||
int32_t szPage;
|
||||
|
||||
STFile file;
|
||||
} SDataFileRAWReaderConfig;
|
||||
|
||||
typedef struct SDataFileRAWReader {
|
||||
SDataFileRAWReaderConfig config[1];
|
||||
|
||||
struct {
|
||||
bool opened;
|
||||
int64_t offset;
|
||||
} ctx[1];
|
||||
|
||||
STsdbFD *fd;
|
||||
} SDataFileRAWReader;
|
||||
|
||||
typedef TARRAY2(SDataFileRAWReader *) SDataFileRAWReaderArray;
|
||||
|
||||
int32_t tsdbDataFileRAWReaderOpen(const char *fname, const SDataFileRAWReaderConfig *config,
|
||||
SDataFileRAWReader **reader);
|
||||
int32_t tsdbDataFileRAWReaderClose(SDataFileRAWReader **reader);
|
||||
|
||||
int32_t tsdbDataFileRAWReadBlockData(SDataFileRAWReader *reader, STsdbDataRAWBlockHeader *bHdr);
|
||||
|
||||
// SDataFileRAWWriter =============================================
|
||||
typedef struct SDataFileRAWWriterConfig {
|
||||
STsdb *tsdb;
|
||||
int32_t szPage;
|
||||
|
||||
SDiskID did;
|
||||
int64_t fid;
|
||||
int64_t cid;
|
||||
int32_t level;
|
||||
|
||||
STFile file;
|
||||
} SDataFileRAWWriterConfig;
|
||||
|
||||
typedef struct SDataFileRAWWriter {
|
||||
SDataFileRAWWriterConfig config[1];
|
||||
|
||||
struct {
|
||||
bool opened;
|
||||
int64_t offset;
|
||||
} ctx[1];
|
||||
|
||||
STFile file;
|
||||
STsdbFD *fd;
|
||||
} SDataFileRAWWriter;
|
||||
|
||||
typedef struct SDataFileRAWWriter SDataFileRAWWriter;
|
||||
|
||||
int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **writer);
|
||||
int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr);
|
||||
|
||||
int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer);
|
||||
int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDataRAWBlockHeader *bHdr);
|
||||
int32_t tsdbDataFileRAWFlush(SDataFileRAWWriter *writer);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TSDB_DATA_FILE_RAW_H*/
|
|
@ -0,0 +1,173 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdbFSetRAW.h"
|
||||
|
||||
// SFSetRAWWriter ==================================================
|
||||
typedef struct SFSetRAWWriter {
|
||||
SFSetRAWWriterConfig config[1];
|
||||
|
||||
struct {
|
||||
TFileOpArray fopArr[1];
|
||||
STFile file;
|
||||
int64_t offset;
|
||||
} ctx[1];
|
||||
|
||||
// writer
|
||||
SDataFileRAWWriter *dataWriter;
|
||||
} SFSetRAWWriter;
|
||||
|
||||
int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
writer[0] = taosMemoryCalloc(1, sizeof(SFSetRAWWriter));
|
||||
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
writer[0]->config[0] = config[0];
|
||||
|
||||
TARRAY2_INIT(writer[0]->ctx->fopArr);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSetRAWWriterFinish(SFSetRAWWriter *writer, TFileOpArray *fopArr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb *tsdb = writer->config->tsdb;
|
||||
|
||||
STFileOp op;
|
||||
TARRAY2_FOREACH(writer->ctx->fopArr, op) {
|
||||
code = TARRAY2_APPEND(fopArr, op);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
TARRAY2_CLEAR(writer->ctx->fopArr, NULL);
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSetRAWWriteFileDataBegin(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SDataFileRAWWriterConfig config = {
|
||||
.tsdb = writer->config->tsdb,
|
||||
.szPage = writer->config->szPage,
|
||||
.did = writer->config->did,
|
||||
.cid = writer->config->cid,
|
||||
.level = writer->config->level,
|
||||
|
||||
.file =
|
||||
{
|
||||
.type = bHdr->file.type,
|
||||
.did = writer->config->did,
|
||||
.cid = writer->config->cid,
|
||||
.size = bHdr->file.size,
|
||||
.minVer = bHdr->file.minVer,
|
||||
.maxVer = bHdr->file.maxVer,
|
||||
.stt = {{
|
||||
.level = bHdr->file.stt->level,
|
||||
}},
|
||||
},
|
||||
};
|
||||
|
||||
code = tsdbDataFileRAWWriterOpen(&config, &writer->dataWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSetRAWWriteFileDataEnd(SFSetRAWWriter *writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbDataFileRAWWriterClose(&writer->dataWriter, false, writer->ctx->fopArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSetRAWWriterClose(SFSetRAWWriter **writer, bool abort, TFileOpArray *fopArr) {
|
||||
if (writer[0] == NULL) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb *tsdb = writer[0]->config->tsdb;
|
||||
|
||||
// end
|
||||
code = tsdbFSetRAWWriteFileDataEnd(writer[0]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbDataFileRAWWriterClose(&writer[0]->dataWriter, abort, writer[0]->ctx->fopArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFSetRAWWriterFinish(writer[0], fopArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
// free
|
||||
TARRAY2_DESTROY(writer[0]->ctx->fopArr, NULL);
|
||||
taosMemoryFree(writer[0]);
|
||||
writer[0] = NULL;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSetRAWWriteBlockData(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
ASSERT(writer->ctx->offset >= 0 && writer->ctx->offset <= writer->ctx->file.size);
|
||||
|
||||
if (writer->ctx->offset == writer->ctx->file.size) {
|
||||
code = tsdbFSetRAWWriteFileDataEnd(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFSetRAWWriteFileDataBegin(writer, bHdr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbDataFileRAWWriteBlockData(writer->dataWriter, bHdr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
writer->ctx->offset += bHdr->dataLength;
|
||||
ASSERT(writer->ctx->offset == writer->dataWriter->ctx->offset);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdbDataFileRAW.h"
|
||||
|
||||
#ifndef _TSDB_FSET_RAW_H
|
||||
#define _TSDB_FSET_RAW_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SFSetRAWWriterConfig {
|
||||
STsdb *tsdb;
|
||||
int32_t szPage;
|
||||
|
||||
SDiskID did;
|
||||
int64_t fid;
|
||||
int64_t cid;
|
||||
int32_t level;
|
||||
} SFSetRAWWriterConfig;
|
||||
|
||||
typedef struct SFSetRAWWriter SFSetRAWWriter;
|
||||
|
||||
int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **writer);
|
||||
int32_t tsdbFSetRAWWriterClose(SFSetRAWWriter **writer, bool abort, TFileOpArray *fopArr);
|
||||
int32_t tsdbFSetRAWWriteBlockData(SFSetRAWWriter *writer, STsdbDataRAWBlockHeader *bHdr);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TSDB_FSET_RAW_H*/
|
|
@ -0,0 +1,586 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbDataFileRAW.h"
|
||||
#include "tsdbFS2.h"
|
||||
#include "tsdbFSetRAW.h"
|
||||
|
||||
// reader
|
||||
|
||||
typedef struct SDataFileRAWReaderIter {
|
||||
int32_t count;
|
||||
int32_t idx;
|
||||
int64_t offset;
|
||||
int64_t size;
|
||||
} SDataFileRAWReaderIter;
|
||||
|
||||
typedef struct STsdbSnapRAWReader {
|
||||
STsdb* tsdb;
|
||||
int64_t ever;
|
||||
int8_t type;
|
||||
|
||||
TFileSetArray* fsetArr;
|
||||
|
||||
// context
|
||||
struct {
|
||||
int32_t fsetArrIdx;
|
||||
STFileSet* fset;
|
||||
bool isDataDone;
|
||||
} ctx[1];
|
||||
|
||||
// reader
|
||||
SDataFileRAWReaderArray dataReaderArr[1];
|
||||
|
||||
// iter
|
||||
SDataFileRAWReaderIter dataIter[1];
|
||||
} STsdbSnapRAWReader;
|
||||
|
||||
int32_t tsdbSnapRAWReaderOpen(STsdb* tsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** reader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
reader[0] = taosMemoryCalloc(1, sizeof(STsdbSnapRAWReader));
|
||||
if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
reader[0]->tsdb = tsdb;
|
||||
reader[0]->ever = ever;
|
||||
reader[0]->type = type;
|
||||
|
||||
code = tsdbFSCreateRefSnapshot(tsdb->pFS, &reader[0]->fsetArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
|
||||
__func__, lino, tstrerror(code), 0, ever, type);
|
||||
tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr);
|
||||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb snapshot reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), 0,
|
||||
ever, type);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapRAWReaderClose(STsdbSnapRAWReader** reader) {
|
||||
if (reader[0] == NULL) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb* tsdb = reader[0]->tsdb;
|
||||
|
||||
tsdbFSDestroyRefSnapshot(&reader[0]->fsetArr);
|
||||
taosMemoryFree(reader[0]);
|
||||
reader[0] = NULL;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadFileSetOpenReader(STsdbSnapRAWReader* reader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// data
|
||||
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
|
||||
if (reader->ctx->fset->farr[ftype] == NULL) {
|
||||
continue;
|
||||
}
|
||||
STFileObj* fobj = reader->ctx->fset->farr[ftype];
|
||||
SDataFileRAWReader* dataReader;
|
||||
SDataFileRAWReaderConfig config = {
|
||||
.tsdb = reader->tsdb,
|
||||
.szPage = reader->tsdb->pVnode->config.tsdbPageSize,
|
||||
.file = fobj->f[0],
|
||||
};
|
||||
code = tsdbDataFileRAWReaderOpen(NULL, &config, &dataReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = TARRAY2_APPEND(reader->dataReaderArr, dataReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// stt
|
||||
SSttLvl* lvl;
|
||||
TARRAY2_FOREACH(reader->ctx->fset->lvlArr, lvl) {
|
||||
STFileObj* fobj;
|
||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||
SDataFileRAWReader* dataReader;
|
||||
SDataFileRAWReaderConfig config = {
|
||||
.tsdb = reader->tsdb,
|
||||
.szPage = reader->tsdb->pVnode->config.tsdbPageSize,
|
||||
.file = fobj->f[0],
|
||||
};
|
||||
code = tsdbDataFileRAWReaderOpen(NULL, &config, &dataReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = TARRAY2_APPEND(reader->dataReaderArr, dataReader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadFileSetCloseReader(STsdbSnapRAWReader* reader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
TARRAY2_CLEAR(reader->dataReaderArr, tsdbDataFileRAWReaderClose);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadFileSetOpenIter(STsdbSnapRAWReader* reader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
reader->dataIter->count = TARRAY2_SIZE(reader->dataReaderArr);
|
||||
reader->dataIter->idx = -1;
|
||||
reader->dataIter->offset = 0;
|
||||
reader->dataIter->size = 0;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadFileSetCloseIter(STsdbSnapRAWReader* reader) {
|
||||
reader->dataIter->count = 0;
|
||||
reader->dataIter->idx = 0;
|
||||
reader->dataIter->offset = 0;
|
||||
reader->dataIter->size = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadNext(STsdbSnapRAWReader* reader, STsdbDataRAWBlockHeader* bHdr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
ASSERT(reader->dataIter->offset <= reader->dataIter->size);
|
||||
ASSERT(reader->dataIter->idx <= reader->dataIter->count);
|
||||
|
||||
if (reader->dataIter->offset == reader->dataIter->size && reader->dataIter->idx < reader->dataIter->count) {
|
||||
reader->dataIter->idx++;
|
||||
}
|
||||
if (reader->dataIter->idx == reader->dataIter->count) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SDataFileRAWReader* dataReader = TARRAY2_GET(reader->dataReaderArr, reader->dataIter->idx);
|
||||
code = tsdbDataFileRAWReadBlockData(dataReader, bHdr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
reader->dataIter->offset += bHdr->dataLength;
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadData(STsdbSnapRAWReader* reader, SSnapDataHdr** data) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
void* pBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + sizeof(STsdbDataRAWBlockHeader) + TSDB_SNAP_RAW_PAYLOAD_SIZE);
|
||||
if (pBuf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = pBuf;
|
||||
pHdr->type = reader->type;
|
||||
STsdbDataRAWBlockHeader* pData = (void*)pHdr->data;
|
||||
pData->dataLength = TSDB_SNAP_RAW_PAYLOAD_SIZE;
|
||||
|
||||
code = tsdbSnapRAWReadNext(reader, pData);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
ASSERT(pData->dataLength > 0 && pData->dataLength <= TSDB_SNAP_RAW_PAYLOAD_SIZE);
|
||||
pHdr->size = sizeof(STsdbDataRAWBlockHeader) + pData->dataLength;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
taosMemoryFree(pBuf);
|
||||
pBuf = NULL;
|
||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
||||
}
|
||||
data[0] = pBuf;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
ASSERT(reader->ctx->fset == NULL);
|
||||
|
||||
if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) {
|
||||
reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++);
|
||||
reader->ctx->isDataDone = false;
|
||||
|
||||
code = tsdbSnapRAWReadFileSetOpenReader(reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSnapRAWReadFileSetOpenIter(reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWReadEnd(STsdbSnapRAWReader* reader) {
|
||||
tsdbSnapRAWReadFileSetCloseIter(reader);
|
||||
tsdbSnapRAWReadFileSetCloseReader(reader);
|
||||
reader->ctx->fset = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapRAWRead(STsdbSnapRAWReader* reader, uint8_t** data) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
data[0] = NULL;
|
||||
|
||||
for (;;) {
|
||||
if (reader->ctx->fset == NULL) {
|
||||
code = tsdbSnapRAWReadBegin(reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (reader->ctx->fset == NULL) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!reader->ctx->isDataDone) {
|
||||
code = tsdbSnapRAWReadData(reader, (SSnapDataHdr**)data);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (data[0]) {
|
||||
goto _exit;
|
||||
} else {
|
||||
reader->ctx->isDataDone = true;
|
||||
}
|
||||
}
|
||||
|
||||
code = tsdbSnapRAWReadEnd(reader);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(reader->tsdb->pVnode), code, lino);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done", TD_VID(reader->tsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// writer
|
||||
struct STsdbSnapRAWWriter {
|
||||
STsdb* tsdb;
|
||||
int64_t sver;
|
||||
int64_t ever;
|
||||
int32_t minutes;
|
||||
int8_t precision;
|
||||
int32_t minRow;
|
||||
int32_t maxRow;
|
||||
int8_t cmprAlg;
|
||||
int64_t commitID;
|
||||
int32_t szPage;
|
||||
int64_t compactVersion;
|
||||
int64_t now;
|
||||
|
||||
TFileSetArray* fsetArr;
|
||||
TFileOpArray fopArr[1];
|
||||
|
||||
struct {
|
||||
bool fsetWriteBegin;
|
||||
int32_t fid;
|
||||
STFileSet* fset;
|
||||
SDiskID did;
|
||||
int64_t cid;
|
||||
int64_t level;
|
||||
|
||||
// writer
|
||||
SFSetRAWWriter* fsetWriter;
|
||||
} ctx[1];
|
||||
};
|
||||
|
||||
int32_t tsdbSnapRAWWriterOpen(STsdb* pTsdb, int64_t ever, STsdbSnapRAWWriter** writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// disable background tasks
|
||||
tsdbFSDisableBgTask(pTsdb->pFS);
|
||||
|
||||
// start to write
|
||||
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
|
||||
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
writer[0]->tsdb = pTsdb;
|
||||
writer[0]->ever = ever;
|
||||
writer[0]->minutes = pTsdb->keepCfg.days;
|
||||
writer[0]->precision = pTsdb->keepCfg.precision;
|
||||
writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
|
||||
writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
||||
writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||
writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
|
||||
writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||
writer[0]->compactVersion = INT64_MAX;
|
||||
writer[0]->now = taosGetTimestampMs();
|
||||
|
||||
code = tsdbFSCreateCopySnapshot(pTsdb->pFS, &writer[0]->fsetArr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64, TD_VID(pTsdb->pVnode), __func__, 0, ever);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWWriteFileSetOpenIter(STsdbSnapRAWWriter* writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWWriteFileSetCloseIter(STsdbSnapRAWWriter* writer) { return 0; }
|
||||
|
||||
static int32_t tsdbSnapRAWWriteFileSetOpenWriter(STsdbSnapRAWWriter* writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
SFSetRAWWriterConfig config = {
|
||||
.tsdb = writer->tsdb,
|
||||
.szPage = writer->szPage,
|
||||
.fid = writer->ctx->fid,
|
||||
.cid = writer->commitID,
|
||||
.did = writer->ctx->did,
|
||||
.level = writer->ctx->level,
|
||||
};
|
||||
|
||||
code = tsdbFSetRAWWriterOpen(&config, &writer->ctx->fsetWriter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWWriteFileSetCloseWriter(STsdbSnapRAWWriter* writer) {
|
||||
return tsdbFSetRAWWriterClose(&writer->ctx->fsetWriter, 0, writer->fopArr);
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t fid) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
ASSERT(writer->ctx->fsetWriteBegin == false);
|
||||
|
||||
STFileSet* fset = &(STFileSet){.fid = fid};
|
||||
|
||||
writer->ctx->fid = fid;
|
||||
STFileSet** fsetPtr = TARRAY2_SEARCH(writer->fsetArr, &fset, tsdbTFileSetCmprFn, TD_EQ);
|
||||
writer->ctx->fset = (fsetPtr == NULL) ? NULL : *fsetPtr;
|
||||
|
||||
int32_t level = tsdbFidLevel(fid, &writer->tsdb->keepCfg, taosGetTimestampSec());
|
||||
if (tfsAllocDisk(writer->tsdb->pVnode->pTfs, level, &writer->ctx->did)) {
|
||||
code = TSDB_CODE_NO_AVAIL_DISK;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
tfsMkdirRecurAt(writer->tsdb->pVnode->pTfs, writer->tsdb->path, writer->ctx->did);
|
||||
|
||||
code = tsdbSnapRAWWriteFileSetOpenWriter(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
writer->ctx->level = level;
|
||||
writer->ctx->fsetWriteBegin = true;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWWriteFileSetEnd(STsdbSnapRAWWriter* writer) {
|
||||
if (!writer->ctx->fsetWriteBegin) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// close write
|
||||
code = tsdbSnapRAWWriteFileSetCloseWriter(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
writer->ctx->fsetWriteBegin = false;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* writer) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbSnapRAWWriteFileSetEnd(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done", TD_VID(writer->tsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** writer, int8_t rollback) {
|
||||
if (writer[0] == NULL) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb* tsdb = writer[0]->tsdb;
|
||||
|
||||
if (rollback) {
|
||||
code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
taosThreadMutexLock(&writer[0]->tsdb->mutex);
|
||||
|
||||
code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
|
||||
if (code) {
|
||||
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
|
||||
|
||||
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
|
||||
}
|
||||
tsdbFSEnableBgTask(tsdb->pFS);
|
||||
|
||||
TARRAY2_DESTROY(writer[0]->fopArr, NULL);
|
||||
tsdbFSDestroyCopySnapshot(&writer[0]->fsetArr);
|
||||
|
||||
taosMemoryFree(writer[0]);
|
||||
writer[0] = NULL;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||
} else {
|
||||
tsdbInfo("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWWriteTimeSeriesData(STsdbSnapRAWWriter* writer, STsdbDataRAWBlockHeader* bHdr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbFSetRAWWriteBlockData(writer->ctx->fsetWriter, bHdr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapRAWWriteData(STsdbSnapRAWWriter* writer, SSnapDataHdr* hdr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdbDataRAWBlockHeader* bHdr = (void*)hdr->data;
|
||||
int32_t fid = bHdr->file.fid;
|
||||
if (!writer->ctx->fsetWriteBegin || fid != writer->ctx->fid) {
|
||||
code = tsdbSnapRAWWriteFileSetEnd(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSnapRAWWriteFileSetBegin(writer, fid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tsdbSnapRAWWriteTimeSeriesData(writer, bHdr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* writer, SSnapDataHdr* hdr) {
|
||||
ASSERT(hdr->type == SNAP_DATA_RAW);
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbSnapRAWWriteData(writer, hdr);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, type:%d index:%" PRId64 " size:%" PRId64,
|
||||
TD_VID(writer->tsdb->pVnode), __func__, lino, tstrerror(code), hdr->type, hdr->index, hdr->size);
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, type:%d index:%" PRId64 " size:%" PRId64, TD_VID(writer->tsdb->pVnode), __func__,
|
||||
hdr->type, hdr->index, hdr->size);
|
||||
}
|
||||
return code;
|
||||
}
|
|
@ -51,6 +51,10 @@ struct SVSnapReader {
|
|||
int8_t tsdbDone;
|
||||
TFileSetRangeArray *pRanges;
|
||||
STsdbSnapReader *pTsdbReader;
|
||||
// tsdb raw
|
||||
int8_t tsdbRawDone;
|
||||
STsdbSnapRAWReader *pTsdbRawReader;
|
||||
|
||||
// tq
|
||||
int8_t tqHandleDone;
|
||||
STqSnapReader *pTqSnapReader;
|
||||
|
@ -467,6 +471,8 @@ struct SVSnapWriter {
|
|||
// tsdb
|
||||
TFileSetRangeArray *pRanges;
|
||||
STsdbSnapWriter *pTsdbSnapWriter;
|
||||
// tsdb raw
|
||||
STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
|
||||
// tq
|
||||
STqSnapWriter *pTqSnapWriter;
|
||||
STqOffsetWriter *pTqOffsetWriter;
|
||||
|
@ -772,6 +778,17 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
|||
code = tsdbSnapWrite(pWriter->pTsdbSnapWriter, pHdr);
|
||||
if (code) goto _err;
|
||||
} break;
|
||||
case SNAP_DATA_RAW: {
|
||||
// tsdb
|
||||
if (pWriter->pTsdbSnapRAWWriter == NULL) {
|
||||
ASSERT(pWriter->sver == 0);
|
||||
code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
code = tsdbSnapRAWWrite(pWriter->pTsdbSnapRAWWriter, pHdr);
|
||||
if (code) goto _err;
|
||||
} break;
|
||||
case SNAP_DATA_TQ_HANDLE: {
|
||||
// tq handle
|
||||
if (pWriter->pTqSnapWriter == NULL) {
|
||||
|
|
Loading…
Reference in New Issue