From 8ff1eb41d1a17614bdc4382444a82d1e83c0e4ae Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 23 Sep 2022 17:13:14 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbCommit.c | 58 +++++++++++++++++++----- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 36aa5c0804..618364d20f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -698,6 +698,7 @@ struct SDiskDataBuilder { int32_t nBuilder; SArray *aBuilder; // SArray uint8_t *aBuf[2]; + SDiskData dd; }; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 0ff9ae5085..70251933d5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -649,6 +649,45 @@ _exit: return code; } +static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) { + int32_t code = 0; + int32_t lino = 0; + + if (pBuilder->nRow == 0) return code; + + SSttBlk sttBlk = {.suid = pBuilder->suid, + .minUid = 0, // todo + .maxUid = 0, // todo + .minKey = 0, // todo + .maxKey = 0, // todo + .minVer = 0, // todo + .maxVer = 0, // todo + .nRow = pBuilder->nRow}; + + // gnrt + code = tGnrtDiskData(pBuilder, &pBuilder->dd); + TSDB_CHECK_CODE(code, lino, _exit); + + // write + // code = tsdbWriteDiskData(pWriter, &pBuilder->dd); + TSDB_CHECK_CODE(code, lino, _exit); + + // push + if (taosArrayPush(aSttBlk, &sttBlk) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // clear (todo) + // tDiskDataBuilderClear(pBuilder); + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; int32_t lino = 0; @@ -1315,24 +1354,22 @@ _exit: return code; } -static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { +static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) { int32_t code = 0; int32_t lino = 0; SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder; if (pBuilder->suid || pBuilder->uid) { if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) { - // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, - // pCommitter->cmprAlg); // todo + code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pBuilder, pCommitter->dWriter.aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); - // tBlockDataReset(pBDatal); } } if (!pBuilder->suid && !pBuilder->uid) { ASSERT(pCommitter->skmTable.suid == id.suid); ASSERT(pCommitter->skmTable.uid == id.uid); - // code = tBlockDataInit(pBDatal, id.suid, id.suid ? 0 : id.uid, pCommitter->skmTable.pTSchema); todo + code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1351,7 +1388,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { SBlockData *pBData = &pCommitter->dWriter.bData; TABLEID id = {.suid = pBData->suid, .uid = pBData->uid}; - code = tsdbInitLastBlockIfNeed(pCommitter, id); + code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id); TSDB_CHECK_CODE(code, lino, _exit); for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) { @@ -1361,8 +1398,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { TSDB_CHECK_CODE(code, lino, _exit); if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { - code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, NULL /*TODO */, pCommitter->dWriter.aSttBlk, - pCommitter->cmprAlg); + code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -1409,8 +1445,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { } if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) { - // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, - // pCommitter->cmprAlg); (todo) + code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -1515,8 +1550,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { code = tsdbMoveCommitData(pCommitter, id); TSDB_CHECK_CODE(code, lino, _exit); - // code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, - // pCommitter->cmprAlg); + code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); _exit: