From b2c147be7204b0b0f86f6a0497fbfe10437a84cf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 13 Jun 2022 13:13:37 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 38 +++++++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbUtil.c | 4 +++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 8aabd6dcd8..46dd2e1c89 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -27,6 +27,8 @@ struct SCommitter { /* commit data */ int32_t minutes; int8_t precision; + int32_t minRow; + int32_t maxRow; TSKEY nextCommitKey; // commit file data int32_t commitFid; @@ -580,18 +582,14 @@ static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) { int32_t code = 0; // old - pCommitter->oBlock.flag = 0; - pCommitter->oBlock.nItem = 0; - pCommitter->oBlock.nData = 0; + tMapDataReset(&pCommitter->oBlock); if (pCommitter->pBlockIdx) { code = tsdbReadBlock(pCommitter->pReader, &pCommitter->oBlock, NULL); if (code) goto _err; } // new - pCommitter->nBlock.flag = 0; - pCommitter->nBlock.nItem = 0; - pCommitter->nBlock.nData = 0; + tMapDataReset(&pCommitter->nBlock); _err: return code; @@ -599,6 +597,7 @@ _err: static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) { int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; STbDataIter *pIter = NULL; int32_t iBlock = 0; int32_t nBlock = pCommitter->nBlock.nItem; @@ -606,19 +605,42 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) { SBlock block; TSDBROW *pRow; TSDBROW row; + int32_t iRow = 0; + STSchema *pTSchema = NULL; if (pCommitter->pTbData) { - code = tsdbTbDataIterCreate(pCommitter->pTbData, NULL, 0, &pIter); + code = tsdbTbDataIterCreate(pCommitter->pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pIter); if (code) goto _err; } + // merge loop for (;;) { - /* code */ + tsdbTbDataIterGet(pIter, pRow); + + code = tsdbColDataBlockAppend(&pCommitter->nColDataBlock, pRow, pTSchema); + if (code) goto _err; + + if (pCommitter->nColDataBlock.nRow >= pCommitter->maxRow) { + code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL); + if (code) goto _err; + + tsdbColDataBlockReset(&pCommitter->nColDataBlock); + } + + if (!tsdbTbDataIterNext(pIter)) break; } + if (pCommitter->nColDataBlock.nRow) { + code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL); + if (code) goto _err; + } + + tsdbTbDataIterDestroy(pIter); return code; _err: + tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbTbDataIterDestroy(pIter); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 15782256f8..e378d7dd5a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -732,6 +732,10 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * } // SColDataBlock ====================================================== +void tsdbColDataBlockReset(SColDataBlock *pColDataBlock) { + // TODO +} + int32_t tsdbColDataBlockAppend(SColDataBlock *pColDataBlock, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; int32_t nRow = pColDataBlock->nRow;