From b554559889adec4047d7242089aada54bccb5ae1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Jul 2022 09:36:23 +0000 Subject: [PATCH] more vnode snapshot --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 118 +++++++++++++++++++++ 1 file changed, 118 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 5c5e81cd49..51514f1257 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -489,9 +489,66 @@ _err: return code; } +static int32_t tsdbSnapWriteTableDataAhead(STsdbSnapWriter* pWriter, TABLEID id) { + int32_t code = 0; + + if (pWriter->pDataFReader == NULL) goto _exit; + + while (true) { + if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; + + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); + int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + + if (c >= 0) break; + + pWriter->iBlockIdx++; + + code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL); + if (code) goto _err; + + SBlock block; + tMapDataReset(&pWriter->mBlockW); + for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { + tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); + + if (block.last) { + code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); + if (code) goto _err; + + tBlockReset(&block); + block.last = 1; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, + pWriter->cmprAlg); + if (code) goto _err; + } + + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + if (code) goto _err; + } + + SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx); + if (code) goto _err; + + if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + +_exit: + return code; + +_err: + return code; +} + static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { int32_t code = 0; SBlockData* pBlockData = &pWriter->bData; + TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); + TSDBKEY keyLast = tBlockDataLastKey(pBlockData); if (pWriter->pDataFReader == NULL) { // no old data @@ -551,6 +608,67 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { } } } else { + // has old data + + // TABLE ================================================== + // end last table data if id not same (todo) + if (pWriter->pBlockIdxW) { + int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); + if (c < 0) { + } else if (c > 0) { + ASSERT(0); + } + } + + // start new table data if need (todo) + if (pWriter->pBlockIdxW == NULL) { + // commit table data ahead + code = tsdbSnapWriteTableDataAhead(pWriter, id); + if (code) goto _err; + + // reader + if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { + pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); + int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id); + if (c) { + pWriter->pBlockIdx = NULL; + } + } else { + pWriter->pBlockIdx = NULL; + } + + if (pWriter->pBlockIdx) { + code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlockW, NULL); + if (code) goto _err; + + pWriter->iBlock = 0; + } + + // writer + pWriter->pBlockIdxW = &pWriter->blockIdxW; + pWriter->pBlockIdxW->suid = id.suid; + pWriter->pBlockIdxW->uid = id.uid; + + tBlockReset(&pWriter->blockW); + tBlockDataReset(&pWriter->bDataW); + tMapDataReset(&pWriter->mBlockW); + } + + // BLOCK ================================================== + // write block ahead + while (true) { + if (pWriter->iBlock >= pWriter->mBlock.nItem) break; + + SBlock block; + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + + if (tsdbKeyCmprFn(&block.maxKey, &keyFirst) >= 0) break; + + pWriter->iBlock++; + + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + if (code) goto _err; + } } return code;