From e6fb46c072532bfc97f35660bd5b2733a9462ff9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 6 Jun 2023 16:42:51 +0800 Subject: [PATCH] more code --- .../vnode/src/tsdb/dev/inc/tsdbSttFileRW.h | 1 + source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 71 +++++++++++++++---- .../dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c | 4 +- 3 files changed, 60 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index 83468bf621..8526a1143a 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -64,6 +64,7 @@ int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArr int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row); int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData); int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *record); +bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer); struct SSttFileWriterConfig { STsdb *tsdb; diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 8c1e2c2d0b..3c2d4320e9 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -48,8 +48,10 @@ typedef struct { // reader TSttFileReaderArray sttReaderArr[1]; // iter - TTsdbIterArray iterArr[1]; - SIterMerger *iterMerger; + TTsdbIterArray dataIterArr[1]; + SIterMerger *dataIterMerger; + TTsdbIterArray tombIterArr[1]; + SIterMerger *tombIterMerger; // writer SSttFileWriter *sttWriter; SDataFileWriter *dataWriter; @@ -86,12 +88,12 @@ static int32_t tsdbMergerClose(SMerger *merger) { ASSERT(merger->dataWriter == NULL); ASSERT(merger->sttWriter == NULL); - ASSERT(merger->iterMerger == NULL); - ASSERT(TARRAY2_SIZE(merger->iterArr) == 0); + ASSERT(merger->dataIterMerger == NULL); + ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0); ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0); // clear the merge - TARRAY2_FREE(merger->iterArr); + TARRAY2_FREE(merger->dataIterArr); TARRAY2_FREE(merger->sttReaderArr); TARRAY2_FREE(merger->fopArr); for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->bData); i++) { @@ -196,7 +198,8 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) { int32_t code = 0; int32_t lino = 0; - for (SRowInfo *row; (row = tsdbIterMergerGet(merger->iterMerger)) != NULL;) { + // data + for (SRowInfo *row; (row = tsdbIterMergerGet(merger->dataIterMerger)) != NULL;) { if (row->uid != merger->ctx->tbid->uid) { code = tsdbMergeToDataTableEnd(merger); TSDB_CHECK_CODE(code, lino, _exit); @@ -234,13 +237,28 @@ static int32_t tsdbMergeToDataLevel(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbIterMergerNext(merger->iterMerger); + code = tsdbIterMergerNext(merger->dataIterMerger); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbMergeToDataTableEnd(merger); TSDB_CHECK_CODE(code, lino, _exit); + // tomb + STombRecord *record; + while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) { + if (tsdbSttFileWriterIsOpened(merger->sttWriter)) { + code = tsdbSttFileWriteTombRecord(merger->sttWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbDataFileWriteTombRecord(merger->dataWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + } + + code = tsdbIterMergerNext(merger->tombIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } + _exit: if (code) { TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); @@ -253,12 +271,23 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) { int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); + // data SRowInfo *row; - while ((row = tsdbIterMergerGet(merger->iterMerger))) { + while ((row = tsdbIterMergerGet(merger->dataIterMerger))) { code = tsdbSttFileWriteTSData(merger->sttWriter, row); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbIterMergerNext(merger->iterMerger); + code = tsdbIterMergerNext(merger->dataIterMerger); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // tomb + STombRecord *record; + while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) { + code = tsdbSttFileWriteTombRecord(merger->sttWriter, record); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerNext(merger->tombIterMerger); TSDB_CHECK_CODE(code, lino, _exit); } @@ -352,15 +381,25 @@ static int32_t tsdbMergeFileSetBeginOpenIter(SMerger *merger) { .sttReader = segReader, }}; + // data iter code = tsdbIterOpen(config, &iter); TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND(merger->dataIterArr, iter); + TSDB_CHECK_CODE(code, lino, _exit); - code = TARRAY2_APPEND(merger->iterArr, iter); + // tomb iter + config->type = TSDB_ITER_TYPE_STT_TOMB; + code = tsdbIterOpen(config, &iter); + TSDB_CHECK_CODE(code, lino, _exit); + code = TARRAY2_APPEND(merger->tombIterArr, iter); TSDB_CHECK_CODE(code, lino, _exit); } } - code = tsdbIterMergerOpen(merger->iterArr, &merger->iterMerger, false); + code = tsdbIterMergerOpen(merger->dataIterArr, &merger->dataIterMerger, false); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbIterMergerOpen(merger->tombIterArr, &merger->tombIterMerger, true); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -464,8 +503,8 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { int32_t lino = 0; ASSERT(TARRAY2_SIZE(merger->sttReaderArr) == 0); - ASSERT(TARRAY2_SIZE(merger->iterArr) == 0); - ASSERT(merger->iterMerger == NULL); + ASSERT(TARRAY2_SIZE(merger->dataIterArr) == 0); + ASSERT(merger->dataIterMerger == NULL); ASSERT(merger->sttWriter == NULL); ASSERT(merger->dataWriter == NULL); @@ -516,8 +555,10 @@ _exit: } static int32_t tsdbMergeFileSetEndCloseIter(SMerger *merger) { - tsdbIterMergerClose(&merger->iterMerger); - TARRAY2_CLEAR(merger->iterArr, tsdbIterClose); + tsdbIterMergerClose(&merger->tombIterMerger); + TARRAY2_CLEAR(merger->tombIterArr, tsdbIterClose); + tsdbIterMergerClose(&merger->dataIterMerger); + TARRAY2_CLEAR(merger->dataIterArr, tsdbIterClose); return 0; } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 3f0acfc949..68af6a6af3 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -890,4 +890,6 @@ _exit: TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; -} \ No newline at end of file +} + +bool tsdbSttFileWriterIsOpened(SSttFileWriter *writer) { return writer->ctx->opened; } \ No newline at end of file