From 9720f0549193c887a88c1ef4e048aff71df7c766 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 6 Jul 2022 06:59:53 +0000 Subject: [PATCH] more tsdb snapshow writer --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 73 +++++++++++++++++++++- 2 files changed, 72 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0f65a536e0..eec9c0f312 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -64,6 +64,7 @@ typedef struct SRowIter SRowIter; typedef struct STsdbFS STsdbFS; typedef struct SRowMerger SRowMerger; typedef struct STsdbFSState STsdbFSState; +typedef struct STsdbSnapHdr STsdbSnapHdr; #define TSDB_MAX_SUBBLOCKS 8 #define TSDB_FHDR_SIZE 512 diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index dc2e006d05..133a519aa2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -308,8 +308,11 @@ struct STsdbSnapWriter { STsdb* pTsdb; int64_t sver; int64_t ever; + // config + int32_t minutes; + int8_t precision; // for data file - int32_t iDFileSet; + int32_t fid; SDataFWriter* pDataFWriter; // for del file SDelFWriter* pDelFWriter; @@ -327,6 +330,47 @@ static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) { return code; } +static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + int64_t suid = 0; // todo + int64_t uid = 0; // todo + int64_t skey; // todo + int64_t ekey; // todo + + int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision); + ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision)); + if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { + if (pWriter->pDataFWriter) { + // finish current file and close the SDataFWriter + } + + pWriter->fid = fid; + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid); + SDFileSet wSet = {0}; + if (pSet == NULL) { + wSet = (SDFileSet){0}; // todo + } else { + wSet = (SDFileSet){0}; // todo + } + + code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); + if (code) goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d tsdb snapshot write data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + // TODO + return code; +} + int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t code = 0; STsdbSnapWriter* pWriter = NULL; @@ -374,6 +418,31 @@ _err: int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; - // TODO + int8_t type = pData[0]; + + // ts data + if (type == 0) { + code = tsdbSnapWriteData(pWriter, pData + 1, nData - 1); + if (code) goto _err; + } else { + if (pWriter->pDataFWriter) { + // commit the remain data of the FSet (todo) + + // close and update the file + code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); + if (code) goto _err; + } + } + + // del data + if (type == 1) { + code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1); + if (code) goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d tsdb snapshow write failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; }