From 8626d3e691d8b5a4bbfdec139de94772a17325a7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 30 May 2023 10:43:44 +0800 Subject: [PATCH] more code --- .../dnode/vnode/src/tsdb/dev/inc/tsdbIter.h | 43 ++++++ source/dnode/vnode/src/tsdb/dev/tsdbIter.c | 129 ++++++++++++++++++ source/dnode/vnode/src/tsdb/dev/tsdbMerge.c | 92 ++++++------- 3 files changed, 215 insertions(+), 49 deletions(-) create mode 100644 source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h create mode 100644 source/dnode/vnode/src/tsdb/dev/tsdbIter.c diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h new file mode 100644 index 0000000000..d6b643e199 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbIter.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "trbtree.h" +#include "tsdbDef.h" + +#ifndef _TSDB_ITER_H_ +#define _TSDB_ITER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SIterMerger SIterMerger; +typedef struct STsdbIter STsdbIter; +typedef TARRAY2(STsdbIter *) TTsdbIterArray; + +// STsdbIter =============== + +// SIterMerger =============== +int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger); +int32_t tsdbIterMergerClear(SIterMerger **merger); +int32_t tsdbIterMergerNext(SIterMerger *merger); +SRowInfo *tsdbIterMergerGet(SIterMerger *merger); +int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid); + +#ifdef __cplusplus +} +#endif + +#endif /*_TSDB_ITER_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbIter.c b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c new file mode 100644 index 0000000000..f2f45747e8 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/dev/tsdbIter.c @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "inc/tsdbIter.h" + +// STsdbIter ================ +struct STsdbIter { + struct { + bool noMoreData; + } ctx[1]; + SRowInfo row[1]; + SRBTreeNode node[1]; + SBlockData bData[1]; + int32_t iRow; + // TODO +}; + +int32_t tsdbIterNext(STsdbIter *iter) { + // TODO + return 0; +} + +static int32_t tsdbIterSkipTableData(STsdbIter *iter) { + // TODO + return 0; +} + +static int32_t tsdbIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { + STsdbIter *iter1 = TCONTAINER_OF(n1, STsdbIter, node); + STsdbIter *iter2 = TCONTAINER_OF(n2, STsdbIter, node); + return tRowInfoCmprFn(&iter1->row, &iter2->row); +} + +// SIterMerger ================ +struct SIterMerger { + STsdbIter *iter; + SRBTree iterTree[1]; +}; + +int32_t tsdbIterMergerInit(const TTsdbIterArray *iterArray, SIterMerger **merger) { + STsdbIter *iter; + + merger[0] = taosMemoryCalloc(1, sizeof(*merger[0])); + if (!merger[0]) return TSDB_CODE_OUT_OF_MEMORY; + + tRBTreeCreate(merger[0]->iterTree, tsdbIterCmprFn); + TARRAY2_FOREACH(iterArray, iter) { + if (iter->ctx->noMoreData) continue; + SRBTreeNode *node = tRBTreePut(merger[0]->iterTree, iter->node); + ASSERT(node); + } + + return tsdbIterMergerNext(merger[0]); +} + +int32_t tsdbIterMergerClear(SIterMerger **merger) { + taosMemoryFree(merger[0]); + return 0; +} + +int32_t tsdbIterMergerNext(SIterMerger *merger) { + int32_t code; + int32_t c; + SRBTreeNode *node; + + if (merger->iter) { + code = tsdbIterNext(merger->iter); + if (code) return code; + + if (merger->iter->ctx->noMoreData) { + merger->iter = NULL; + } else if ((node = tRBTreeMin(merger->iterTree))) { + c = tsdbIterCmprFn(merger->iter->node, node); + ASSERT(c); + if (c > 0) { + node = tRBTreePut(merger->iterTree, merger->iter->node); + merger->iter = NULL; + ASSERT(node); + } + } + } + + if (!merger->iter && (node = tRBTreeDropMin(merger->iterTree))) { + merger->iter = TCONTAINER_OF(node, STsdbIter, node); + } + + return 0; +} + +SRowInfo *tsdbIterMergerGet(SIterMerger *merger) { return merger->iter ? merger->iter->row : NULL; } + +int32_t tsdbIterMergerSkipTableData(SIterMerger *merger, const TABLEID *tbid) { + int32_t code; + int32_t c; + SRBTreeNode *node; + + while (merger->iter && tbid->suid == merger->iter->row->suid && tbid->uid == merger->iter->row->uid) { + int32_t code = tsdbIterSkipTableData(merger->iter); + if (code) return code; + + if (merger->iter->ctx->noMoreData) { + merger->iter = NULL; + c = tsdbIterCmprFn(merger->iter->node, node); + ASSERT(c); + if (c > 0) { + node = tRBTreePut(merger->iterTree, merger->iter->node); + merger->iter = NULL; + ASSERT(node); + } + } + + if (!merger->iter && (node = tRBTreeDropMin(merger->iterTree))) { + merger->iter = TCONTAINER_OF(node, STsdbIter, node); + } + } + return 0; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c index 081cef46b1..79931df44f 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbMerge.c @@ -15,19 +15,17 @@ #include "inc/tsdbMerge.h" -typedef struct { - bool launched; - bool toData; - int32_t level; - STFileSet *fset; - SRowInfo *pRowInfo; - SBlockData bData; -} SMergeCtx; - typedef struct { STsdb *tsdb; // context - SMergeCtx ctx; + struct { + bool opened; + bool toData; + int32_t level; + STFileSet *fset; + SRowInfo *row; + SBlockData bData; + } ctx[1]; // config int32_t maxRow; int32_t minRow; @@ -48,7 +46,7 @@ typedef struct { } SMerger; static int32_t tsdbMergerOpen(SMerger *merger) { - merger->ctx.launched = true; + merger->ctx->opened = true; TARRAY2_INIT(&merger->fopArr); return 0; } @@ -84,20 +82,20 @@ static int32_t tsdbMergeNextRow(SMerger *merger) { } static int32_t tsdbMergeToDataWriteTSDataBlock(SMerger *merger) { - if (merger->ctx.bData.nRow == 0) return 0; + if (merger->ctx->bData.nRow == 0) return 0; int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - if (merger->ctx.bData.nRow >= merger->minRow) { - // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx.bData); + if (merger->ctx->bData.nRow >= merger->minRow) { + // code = tsdbDataFWriteTSDataBlock(merger->dataWriter, &merger->ctx->bData); // TSDB_CHECK_CODE(code, lino, _exit); } else { - code = tsdbSttFWriteTSDataBlock(merger->sttWriter, &merger->ctx.bData); + code = tsdbSttFWriteTSDataBlock(merger->sttWriter, &merger->ctx->bData); TSDB_CHECK_CODE(code, lino, _exit); } - tBlockDataReset(&merger->ctx.bData); + tBlockDataReset(&merger->ctx->bData); _exit: if (code) { @@ -114,28 +112,28 @@ static int32_t tsdbMergeToData(SMerger *merger) { code = tsdbMergeNextRow(merger); TSDB_CHECK_CODE(code, lino, _exit); - if (!merger->ctx.pRowInfo) { + if (!merger->ctx->row) { code = tsdbMergeToDataWriteTSDataBlock(merger); TSDB_CHECK_CODE(code, lino, _exit); break; } - if (!TABLE_SAME_SCHEMA(merger->ctx.bData.suid, merger->ctx.bData.suid, merger->ctx.pRowInfo->suid, - merger->ctx.pRowInfo->uid)) { + if (!TABLE_SAME_SCHEMA(merger->ctx->bData.suid, merger->ctx->bData.suid, merger->ctx->row->suid, + merger->ctx->row->uid)) { code = tsdbMergeToDataWriteTSDataBlock(merger); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx.pRowInfo, &merger->skmTb); + code = tsdbUpdateSkmTb(merger->tsdb, (TABLEID *)merger->ctx->row, &merger->skmTb); TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataInit(&merger->ctx.bData, (TABLEID *)merger->ctx.pRowInfo, merger->skmTb.pTSchema, NULL, 0); + code = tBlockDataInit(&merger->ctx->bData, (TABLEID *)merger->ctx->row, merger->skmTb.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(&merger->ctx.bData, &merger->ctx.pRowInfo->row, NULL, merger->ctx.pRowInfo->uid); + code = tBlockDataAppendRow(&merger->ctx->bData, &merger->ctx->row->row, NULL, merger->ctx->row->uid); TSDB_CHECK_CODE(code, lino, _exit); - if (merger->ctx.bData.nRow >= merger->maxRow) { + if (merger->ctx->bData.nRow >= merger->maxRow) { code = tsdbMergeToDataWriteTSDataBlock(merger); TSDB_CHECK_CODE(code, lino, _exit); } @@ -157,9 +155,9 @@ static int32_t tsdbMergeToUpperLevel(SMerger *merger) { code = tsdbMergeNextRow(merger); TSDB_CHECK_CODE(code, lino, _exit); - if (!merger->ctx.pRowInfo) break; + if (!merger->ctx->row) break; - code = tsdbSttFWriteTSData(merger->sttWriter, merger->ctx.pRowInfo); + code = tsdbSttFWriteTSData(merger->sttWriter, merger->ctx->row); TSDB_CHECK_CODE(code, lino, _exit); } @@ -174,27 +172,27 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(merger->tsdb->pVnode); - STFileSet *fset = merger->ctx.fset; + STFileSet *fset = merger->ctx->fset; // prepare the merger file set SSttLvl *lvl; STFileObj *fobj; - merger->ctx.toData = true; - merger->ctx.level = 0; + merger->ctx->toData = true; + merger->ctx->level = 0; TARRAY2_FOREACH(&fset->lvlArr, lvl) { - if (lvl->level != merger->ctx.level) { + if (lvl->level != merger->ctx->level) { lvl = NULL; break; } fobj = TARRAY2_GET(&lvl->farr, 0); if (fobj->f.stt->nseg < merger->tsdb->pVnode->config.sttTrigger) { - merger->ctx.toData = false; + merger->ctx->toData = false; break; } else { ASSERT(lvl->level == 0 || TARRAY2_SIZE(&lvl->farr) == 1); - merger->ctx.level++; + merger->ctx->level++; // open the reader SSttFileReader *reader; @@ -249,7 +247,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { .fid = fset->fid, .cid = merger->cid, .size = 0, - .stt = {{.level = merger->ctx.level, .nseg = 0}}, + .stt = {{.level = merger->ctx->level, .nseg = 0}}, }, }; code = tsdbSttFWriterOpen(&config, &merger->sttWriter); @@ -257,7 +255,7 @@ static int32_t tsdbMergeFileSetBegin(SMerger *merger) { } // open data file writer - if (merger->ctx.toData) { + if (merger->ctx->toData) { SDataFileWriterConfig config = { .tsdb = merger->tsdb, // TODO @@ -286,7 +284,7 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) { TSDB_CHECK_CODE(code, lino, _exit); } - if (merger->ctx.toData) { + if (merger->ctx->toData) { // code = tsdbDataFWriterClose(); // TSDB_CHECK_CODE(code, lino, _exit); } @@ -301,18 +299,18 @@ static int32_t tsdbMergeFileSet(SMerger *merger, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; - if (merger->ctx.launched == false) { + if (merger->ctx->opened == false) { code = tsdbMergerOpen(merger); TSDB_CHECK_CODE(code, lino, _exit); } - merger->ctx.fset = fset; + merger->ctx->fset = fset; code = tsdbMergeFileSetBegin(merger); TSDB_CHECK_CODE(code, lino, _exit); // do merge - if (merger->ctx.toData) { + if (merger->ctx->toData) { code = tsdbMergeToData(merger); TSDB_CHECK_CODE(code, lino, _exit); } else { @@ -343,13 +341,9 @@ int32_t tsdbMerge(STsdb *tsdb) { STFileObj *fobj; int32_t sttTrigger = vnode->config.sttTrigger; - SMerger merger = { - .tsdb = tsdb, - .ctx = - { - .launched = false, - }, - }; + SMerger merger[1]; + merger->tsdb = tsdb; + merger->ctx->opened = false; // loop to merge each file set TARRAY2_FOREACH(&fs->cstate, fset) { @@ -363,21 +357,21 @@ int32_t tsdbMerge(STsdb *tsdb) { fobj = TARRAY2_GET(&lvl0->farr, 0); if (fobj->f.stt->nseg >= sttTrigger) { - code = tsdbMergeFileSet(&merger, fset); + code = tsdbMergeFileSet(merger, fset); TSDB_CHECK_CODE(code, lino, _exit); } } // end the merge - if (merger.ctx.launched) { - code = tsdbMergerClose(&merger); + if (merger->ctx->opened) { + code = tsdbMergerClose(merger); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); - } else if (merger.ctx.launched) { + TSDB_ERROR_LOG(vid, lino, code); + } else if (merger->ctx->opened) { tsdbDebug("vgId:%d %s done", vid, __func__); } return 0;