From a85d522e20d9862d2eddd4b43ed469cdcd27400b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 15 Jun 2022 12:28:52 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 72 +++++++++++++++---- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 4 +- 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 0b844e98c6..4468fbe623 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -352,30 +352,78 @@ static FORCE_INLINE bool tsdbCommitIterEnd(SCommitter *pCommitter, STbDataIter * } static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter, STbDataIter *pIter, SBlockIdx *pBlockIdx) { - int32_t code = 0; - TSDBROW *pRow; - int32_t iBlock; - int32_t nBlock; - SBlock *pBlock; - SBlock block; + int32_t code = 0; + int32_t c; + int32_t iBlock; + int32_t nBlock; + SBlock *pBlock; + SBlock block; + SBlockIdx blockIdx; // todo - // start(todo) + // start ================================ + tMapDataReset(&pCommitter->oBlock); + tMapDataReset(&pCommitter->nBlock); + if (pBlockIdx) { + code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlock, NULL); + if (code) goto _err; + } + + // impl =============================== + iBlock = 0; + nBlock = pCommitter->oBlock.nItem; - // impl - pRow = tsdbTbDataIterGet(pIter); if (iBlock < nBlock) { pBlock = █ - tMapDataGetItemByIdx(&pCommitter->oBlock, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->nBlock, iBlock, pBlock, tGetBlock); + } else { + pBlock = NULL; } while (true) { - if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && pBlock == NULL) break; + TSDBROW *pRow = tsdbTbDataIterGet(pIter); + bool iterEnd = ((pRow == NULL) || (pRow->pTSRow->ts > pCommitter->maxKey)); + bool blockEnd = (pBlock == NULL); + + if (iterEnd && blockEnd) break; + + if (!iterEnd && !blockEnd) { + c = tBlockCmprFn(&(SBlock){.maxKey.ts = pRow->pTSRow->ts}, pBlock); + + if (c == 0) { + // merge until pBlock->maxKey + // if (pBlock->last), merge until pCommitter->maxKey + // else merge until pBlock->maxKey + } else if (c < 0) { + // tsdbCommitTableMemData(pIter, pBlock); + // if (pBlock->last), merge until pCommitter->maxKey + // else, commit until pBlock->minKey-1 + } else { + // tsdbCommitTableDiskData(pBlock); + // if (pBlock->last), merge until pCommitter->maxKey + // else, move the block to new one + } + } else if (!iterEnd) { + // no block on disk, commit to last when there are no enough data + // commit memory data to pCommitter->maxKey + // tsdbCommitTableMemData(pIter, NULL); + } else { + // tsdbCommitTableDiskData(pBlock); // only left block + // if (last block ? ) else ? + // tMapDataPutItem(&pCommitter->nBlock, pBlock, tPutBlock); + iBlock++; // get new SBlock + } } - // end + // end =============================== + code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlock, NULL, &blockIdx); + if (code) goto _err; + + code = tMapDataPutItem(&pCommitter->nBlockIdx, &blockIdx, tPutBlockIdx); + if (code) goto _err; return code; _err: + tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 24412802d4..06c08c17de 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -669,8 +669,10 @@ _err: return code; } -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pBlockMap, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { int32_t code = 0; + + ASSERT(pBlockMap->nItem > 0); // TODO return code; }