From c7d1c88fbcc69490bcfe14ac640fd0d2558c8676 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 18 May 2023 15:06:21 +0800 Subject: [PATCH] more code --- include/util/trbtree.h | 2 +- .../dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h | 8 +- .../src/tsdb/dev/inc/tsdbSttFReaderWriter.h | 2 +- source/dnode/vnode/src/tsdb/dev/tsdbCommit.c | 113 +++++++++++++----- source/dnode/vnode/src/tsdb/dev/tsdbFS.c | 2 + source/dnode/vnode/src/tsdb/dev/tsdbFSet.c | 6 + source/util/src/trbtree.c | 2 +- 7 files changed, 97 insertions(+), 38 deletions(-) diff --git a/include/util/trbtree.h b/include/util/trbtree.h index dedf9c811f..8353a91f0a 100644 --- a/include/util/trbtree.h +++ b/include/util/trbtree.h @@ -39,7 +39,7 @@ void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z); SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey); SRBTreeNode *tRBTreeDropMin(SRBTree *pTree); SRBTreeNode *tRBTreeDropMax(SRBTree *pTree); -SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode); +SRBTreeNode *tRBTreeGet(const SRBTree *pTree, const SRBTreeNode *pKeyNode); // SRBTreeIter ============================================= #define tRBTreeIterCreate(tree, ascend) \ diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h index 5a744a3de9..7c3d6a0088 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbFSet.h @@ -24,6 +24,7 @@ extern "C" { typedef struct STFileSet STFileSet; typedef struct STFileOp STFileOp; +typedef struct SSttLvl SSttLvl; typedef enum { TSDB_FOP_NONE = 0, @@ -39,9 +40,10 @@ int32_t tsdbFileSetInit(STFileSet *pSet, int32_t fid); int32_t tsdbFileSetInitEx(const STFileSet *fset1, STFileSet *fset2); int32_t tsdbFileSetClear(STFileSet *pSet); int32_t tsdbFileSetEdit(STFileSet *fset, const STFileOp *op); - int32_t tsdbFSetCmprFn(const STFileSet *pSet1, const STFileSet *pSet2); +const SSttLvl *tsdbFileSetGetLvl(const STFileSet *fset, int32_t level); + struct STFileOp { tsdb_fop_t op; int32_t fid; @@ -49,12 +51,12 @@ struct STFileOp { STFile nState; // new file state }; -typedef struct SSttLvl { +struct SSttLvl { int32_t level; // level int32_t nstt; // number of .stt files on this level SRBTree sttTree; // .stt file tree, sorted by cid SRBTreeNode rbtn; -} SSttLvl; +}; struct STFileSet { int32_t fid; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h index 0c8c0a6557..6d71508363 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFReaderWriter.h @@ -61,13 +61,13 @@ int32_t tsdbSttFWriteDLData(SSttFileWriter *pWriter, TABLEID *tbid, SDelData *pD struct SSttFileWriterConfig { STsdb *pTsdb; - STFile file; int32_t maxRow; int32_t szPage; int8_t cmprAlg; SSkmInfo *pSkmTb; SSkmInfo *pSkmRow; uint8_t **aBuf; + STFile file; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index d5bbaa169b..4c328d472b 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -37,57 +37,106 @@ typedef struct { int32_t expLevel; TSKEY minKey; TSKEY maxKey; - const STFileSet *pFileSet; + const STFileSet *fset; // writer SSttFileWriter *pWriter; } SCommitter; -static int32_t open_committer_writer(SCommitter *pCommitter) { +static int32_t open_writer_with_new_stt(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; STsdb *pTsdb = pCommitter->pTsdb; - int32_t vid = TD_VID(pTsdb->pVnode); + SVnode *pVnode = pTsdb->pVnode; + int32_t vid = TD_VID(pVnode); - SSttFileWriterConfig config = { - .pTsdb = pCommitter->pTsdb, - .maxRow = pCommitter->maxRow, - .szPage = pTsdb->pVnode->config.tsdbPageSize, - .cmprAlg = pCommitter->cmprAlg, - .pSkmTb = NULL, - .pSkmRow = NULL, - .aBuf = NULL, - }; + SSttFileWriterConfig config; + SDiskID did; - if (pCommitter->pFileSet) { - // TODO - ASSERT(0); - } else { - config.file.type = TSDB_FTYPE_STT; - - if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &config.file.did) < 0) { - code = TSDB_CODE_FS_NO_VALID_DISK; - TSDB_CHECK_CODE(code, lino, _exit); - } - - config.file.fid = pCommitter->fid; - config.file.cid = pCommitter->eid; - config.file.size = 0; - config.file.stt.level = 0; - config.file.stt.nseg = 0; - - tsdbTFileInit(pTsdb, &config.file); + if (tfsAllocDisk(pVnode->pTfs, pCommitter->expLevel, &did) < 0) { + code = TSDB_CODE_FS_NO_VALID_DISK; + TSDB_CHECK_CODE(code, lino, _exit); } + config.pTsdb = pTsdb; + config.maxRow = pCommitter->maxRow; + config.szPage = pVnode->config.tsdbPageSize; + config.cmprAlg = pCommitter->cmprAlg; + config.pSkmTb = NULL; + config.pSkmRow = NULL; + config.aBuf = NULL; + config.file.type = TSDB_FTYPE_STT; + config.file.did = did; + config.file.fid = pCommitter->fid; + config.file.cid = pCommitter->eid; + config.file.size = 0; + config.file.stt.level = 0; + config.file.stt.nseg = 0; + tsdbTFileInit(pTsdb, &config.file); + code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", vid, __func__, lino, tstrerror(code), pCommitter->fid); + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s success", vid, __func__); } return code; } +static int32_t open_writer_with_exist_stt(SCommitter *pCommitter, const STFile *pFile) { + int32_t code = 0; + int32_t lino = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SVnode *pVnode = pTsdb->pVnode; + int32_t vid = TD_VID(pVnode); + + SSttFileWriterConfig config = { + // + .pTsdb = pTsdb, + .maxRow = pCommitter->maxRow, + .szPage = pVnode->config.tsdbPageSize, + .cmprAlg = pCommitter->cmprAlg, + .pSkmTb = NULL, + .pSkmRow = NULL, + .aBuf = NULL, + .file = *pFile // + }; + + code = tsdbSttFWriterOpen(&config, &pCommitter->pWriter); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + } else { + tsdbDebug("vgId:%d %s success", vid, __func__); + } + return code; +} +static int32_t open_committer_writer(SCommitter *pCommitter) { + if (!pCommitter->fset) { + return open_writer_with_new_stt(pCommitter); + } + + const SSttLvl *lvl0 = tsdbFileSetGetLvl(pCommitter->fset, 0); + if (lvl0 == NULL) { + return open_writer_with_new_stt(pCommitter); + } + + SRBTreeNode *node = tRBTreeMax(&lvl0->sttTree); + if (node == NULL) { + return open_writer_with_new_stt(pCommitter); + } else { + STFileObj *fobj = TCONTAINER_OF(node, STFileObj, rbtn); + if (fobj->f.stt.nseg >= pCommitter->sttTrigger) { + return open_writer_with_new_stt(pCommitter); + } else { + return open_writer_with_exist_stt(pCommitter, &fobj->f); + } + } +} static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, TABLEID *tbid, TSDBROW *pRow) { int32_t code = 0; @@ -211,7 +260,7 @@ static int32_t commit_fset_start(SCommitter *pCommitter) { pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pTsdb->keepCfg, taosGetTimestampSec()); pCommitter->nextKey = TSKEY_MAX; - tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->pFileSet); + tsdbFSGetFSet(pTsdb->pFS, pCommitter->fid, &pCommitter->fset); tsdbDebug("vgId:%d %s done, fid:%d minKey:%" PRId64 " maxKey:%" PRId64 " expLevel:%d", vid, __func__, pCommitter->fid, pCommitter->minKey, pCommitter->maxKey, pCommitter->expLevel); diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c index 74b817a231..b99d25be8a 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFS.c @@ -284,6 +284,7 @@ static int32_t apply_commit(STFileSystem *fs) { if (code) return code; i1++; n1++; + i2++; } else { // edit code = apply_commit_upd_fset(fs, fset1, fset2); @@ -302,6 +303,7 @@ static int32_t apply_commit(STFileSystem *fs) { if (code) return code; i1++; n1++; + i2++; } } diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c index b0c1e0718a..0725e4ae56 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbFSet.c @@ -295,3 +295,9 @@ int32_t tsdbFileSetClear(STFileSet *pSet) { // TODO return 0; } + +const SSttLvl *tsdbFileSetGetLvl(const STFileSet *fset, int32_t level) { + SSttLvl tlvl = {.level = level}; + SRBTreeNode *node = tRBTreeGet(&fset->lvlTree, &tlvl.rbtn); + return node ? TCONTAINER_OF(node, SSttLvl, rbtn) : NULL; +} \ No newline at end of file diff --git a/source/util/src/trbtree.c b/source/util/src/trbtree.c index 576cbf5633..e1000f7bc1 100644 --- a/source/util/src/trbtree.c +++ b/source/util/src/trbtree.c @@ -443,7 +443,7 @@ SRBTreeNode *tRBTreeDropMax(SRBTree *pTree) { return pNode; } -SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode) { +SRBTreeNode *tRBTreeGet(const SRBTree *pTree, const SRBTreeNode *pKeyNode) { SRBTreeNode *pNode = pTree->root; while (pNode != pTree->NIL) {