feat: support snap replication by file blocks

This commit is contained in:
Benguang Zhao 2023-11-24 20:04:39 +08:00
parent 1dc9019baa
commit f136266972
3 changed files with 57 additions and 25 deletions

View File

@ -83,12 +83,26 @@ _exit:
} }
// SDataFileRAWWriter ============================================= // SDataFileRAWWriter =============================================
int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **writer) { int32_t tsdbDataFileRAWWriterOpen(const SDataFileRAWWriterConfig *config, SDataFileRAWWriter **ppWriter) {
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0])); int32_t code = 0;
if (!writer[0]) return TSDB_CODE_OUT_OF_MEMORY; int32_t lino = 0;
writer[0]->config[0] = config[0]; SDataFileRAWWriter *writer = taosMemoryCalloc(1, sizeof(SDataFileRAWWriter));
return 0; if (!writer) return TSDB_CODE_OUT_OF_MEMORY;
writer->config[0] = config[0];
code = tsdbDataFileRAWWriterDoOpen(writer);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
taosMemoryFree(writer);
writer = NULL;
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
ppWriter[0] = writer;
return code;
} }
static int32_t tsdbDataFileRAWWriterCloseAbort(SDataFileRAWWriter *writer) { static int32_t tsdbDataFileRAWWriterCloseAbort(SDataFileRAWWriter *writer) {
@ -98,28 +112,13 @@ static int32_t tsdbDataFileRAWWriterCloseAbort(SDataFileRAWWriter *writer) {
static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return 0; } static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return 0; }
int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
writer->file = writer->config->file;
writer->ctx->offset = 0;
writer->ctx->opened = true;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) { static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STFileOp op; ASSERT(writer->ctx->offset == writer->file.size);
ASSERT(writer->config->fid == writer->file.fid);
op = (STFileOp){ STFileOp op = (STFileOp){
.optype = TSDB_FOP_CREATE, .optype = TSDB_FOP_CREATE,
.fid = writer->config->fid, .fid = writer->config->fid,
.nf = writer->file, .nf = writer->file,
@ -147,7 +146,7 @@ static int32_t tsdbDataFileRAWWriterOpenDataFD(SDataFileRAWWriter *writer) {
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
int32_t flag = TD_FILE_READ | TD_FILE_WRITE; int32_t flag = TD_FILE_READ | TD_FILE_WRITE;
if (writer->file.size == 0) { if (writer->ctx->offset == 0) {
flag |= (TD_FILE_CREATE | TD_FILE_TRUNC); flag |= (TD_FILE_CREATE | TD_FILE_TRUNC);
} }
@ -162,6 +161,24 @@ _exit:
return code; return code;
} }
int32_t tsdbDataFileRAWWriterDoOpen(SDataFileRAWWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
writer->file = writer->config->file;
writer->ctx->offset = 0;
code = tsdbDataFileRAWWriterOpenDataFD(writer);
TSDB_CHECK_CODE(code, lino, _exit);
writer->ctx->opened = true;
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code);
}
return code;
}
int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr) { int32_t tsdbDataFileRAWWriterClose(SDataFileRAWWriter **writer, bool abort, TFileOpArray *opArr) {
if (writer[0] == NULL) return 0; if (writer[0] == NULL) return 0;
@ -195,7 +212,6 @@ int32_t tsdbDataFileRAWWriteBlockData(SDataFileRAWWriter *writer, const STsdbDat
code = tsdbWriteFile(writer->fd, writer->ctx->offset, (const uint8_t *)pDataBlock->data, pDataBlock->dataLength); code = tsdbWriteFile(writer->fd, writer->ctx->offset, (const uint8_t *)pDataBlock->data, pDataBlock->dataLength);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->file.size += pDataBlock->dataLength;
writer->ctx->offset += pDataBlock->dataLength; writer->ctx->offset += pDataBlock->dataLength;
_exit: _exit:

View File

@ -74,6 +74,7 @@ static int32_t tsdbFSetRAWWriteFileDataBegin(SFSetRAWWriter *writer, STsdbDataRA
SDataFileRAWWriterConfig config = { SDataFileRAWWriterConfig config = {
.tsdb = writer->config->tsdb, .tsdb = writer->config->tsdb,
.szPage = writer->config->szPage, .szPage = writer->config->szPage,
.fid = bHdr->file.fid,
.did = writer->config->did, .did = writer->config->did,
.cid = writer->config->cid, .cid = writer->config->cid,
.level = writer->config->level, .level = writer->config->level,
@ -81,6 +82,7 @@ static int32_t tsdbFSetRAWWriteFileDataBegin(SFSetRAWWriter *writer, STsdbDataRA
.file = .file =
{ {
.type = bHdr->file.type, .type = bHdr->file.type,
.fid = bHdr->file.fid,
.did = writer->config->did, .did = writer->config->did,
.cid = writer->config->cid, .cid = writer->config->cid,
.size = bHdr->file.size, .size = bHdr->file.size,
@ -92,6 +94,9 @@ static int32_t tsdbFSetRAWWriteFileDataBegin(SFSetRAWWriter *writer, STsdbDataRA
}, },
}; };
writer->ctx->offset = 0;
writer->ctx->file = config.file;
code = tsdbDataFileRAWWriterOpen(&config, &writer->dataWriter); code = tsdbDataFileRAWWriterOpen(&config, &writer->dataWriter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -277,6 +277,8 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
// TSDB ============== // TSDB ==============
pReader->tsdbDone = true;
if (!pReader->tsdbDone) { if (!pReader->tsdbDone) {
// open if not // open if not
if (pReader->pTsdbReader == NULL) { if (pReader->pTsdbReader == NULL) {
@ -641,6 +643,10 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter); tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter);
} }
if (pWriter->pTsdbSnapRAWWriter) {
tsdbSnapRAWWriterPrepareClose(pWriter->pTsdbSnapRAWWriter);
}
if (pWriter->pRsmaSnapWriter) { if (pWriter->pRsmaSnapWriter) {
rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter); rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter);
} }
@ -677,6 +683,11 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _exit; if (code) goto _exit;
} }
if (pWriter->pTsdbSnapRAWWriter) {
code = tsdbSnapRAWWriterClose(&pWriter->pTsdbSnapRAWWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pTqSnapWriter) { if (pWriter->pTqSnapWriter) {
code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback); code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback);
if (code) goto _exit; if (code) goto _exit;