From 98384b1a4e6099d3e5105ee3818e616b61dc6fad Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 26 Apr 2023 17:17:32 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 68 +++++++++++++-------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 315181ce10..c49bfdfc46 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -16,10 +16,17 @@ #include "dev.h" typedef struct { - STsdb *pTsdb; + STsdb *pTsdb; + SFileSet *pSet; + // reader SSttFileReader *pSttFReader; - SSttFileWriter *pSttFWriter; + int32_t nSttFSegReader; + SSttFSegReader *aSttFSegReader; + + // writer + SSttFileWriter *pSttFWriter; + SDataFileWriter *pDataFWriter; SArray *aFileOp; // SArray } SMerger; @@ -41,12 +48,38 @@ static int32_t tsdbFileSetMerge(struct SFileSet *pFileSet) { // TODO return 0; } -static int32_t tsdbOpenMerger(STsdb *pTsdb, SMerger *merger) { - ASSERTS(0, "TODO: not implemented yet"); + +static int32_t tsdbOpenMerger(STsdb *pTsdb, SMerger *pMerger) { + pMerger->pTsdb = pTsdb; // TODO return 0; } +static int32_t tsdbDestroyMerger(SMerger *pMerger) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbCloseMerger(SMerger *pMerger) { + int32_t code = 0; + int32_t lino; + + STsdb *pTsdb = pMerger->pTsdb; + + code = tsdbFileSystemEditBegin(pTsdb->pFS, pMerger->aFileOp, TSDB_FS_EDIT_MERGE); + TSDB_CHECK_CODE(code, lino, _exit) + +_exit: + if (code) { + tsdbFileSystemEditAbort(pTsdb->pFS, TSDB_FS_EDIT_MERGE); + } else { + tsdbFileSystemEditCommit(pTsdb->pFS, TSDB_FS_EDIT_MERGE); + } + tsdbDestroyMerger(pMerger); + return code; +} + int32_t tsdbMerge(STsdb *pTsdb) { int32_t code = 0; int32_t lino; @@ -55,13 +88,9 @@ int32_t tsdbMerge(STsdb *pTsdb) { goto _exit; } - // do merge - SMerger merger = {0}; - - TSDB_CHECK_CODE( // - code = tsdbOpenMerger(pTsdb, &merger), // - lino, // - _exit); + SMerger pMerger = {0}; + code = tsdbOpenMerger(pTsdb, &pMerger); + TSDB_CHECK_CODE(code, lino, _exit) for (int32_t i = 0; i < taosArrayGetSize(pTsdb->pFS->aFileSet); i++) { struct SFileSet *pFileSet = taosArrayGet(pTsdb->pFS->aFileSet, i); @@ -69,21 +98,12 @@ int32_t tsdbMerge(STsdb *pTsdb) { continue; } - TSDB_CHECK_CODE( // - code = tsdbFileSetMerge(pFileSet), // - lino, // - _exit); + code = tsdbFileSetMerge(pFileSet); + TSDB_CHECK_CODE(code, lino, _exit) } - TSDB_CHECK_CODE( // - code = tsdbFileSystemEditBegin(pTsdb->pFS, merger.aFileOp, TSDB_FS_EDIT_MERGE), // - lino, // - _exit); - - TSDB_CHECK_CODE( // - code = tsdbFileSystemEditCommit(pTsdb->pFS, TSDB_FS_EDIT_MERGE), // - lino, // - _exit); + code = tsdbCloseMerger(&pMerger); + TSDB_CHECK_CODE(code, lino, _exit) _exit: if (code) {