From 7e45878581e0a06ca2c19eefc122cf11f8a026b9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Jul 2022 10:03:18 +0000 Subject: [PATCH] more vnode snapshot --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 342 +++++++++++---------- 1 file changed, 182 insertions(+), 160 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index d9a67ce487..932cf9f9ae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -488,52 +488,42 @@ _err: return code; } -static int32_t tsdbSnapWriteTableDataAhead(STsdbSnapWriter* pWriter, TABLEID id) { +static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { int32_t code = 0; - if (pWriter->pDataFReader == NULL) goto _exit; + code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL); + if (code) goto _err; - while (true) { - if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; + // SBlockData + SBlock block; + tMapDataReset(&pWriter->mBlockW); + for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { + tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + if (block.last) { + code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); + if (code) goto _err; - if (c >= 0) break; - - pWriter->iBlockIdx++; - - code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock, NULL); - if (code) goto _err; - - SBlock block; - tMapDataReset(&pWriter->mBlockW); - for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { - tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); - - if (block.last) { - code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); - if (code) goto _err; - - tBlockReset(&block); - block.last = 1; - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, - pWriter->cmprAlg); - if (code) goto _err; - } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + tBlockReset(&block); + block.last = 1; + code = + tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, pWriter->cmprAlg); if (code) goto _err; } - SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); if (code) goto _err; + } - if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + // SBlock + SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; + code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, &blockIdx); + if (code) goto _err; + + // SBlockIdx + if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } _exit: @@ -543,14 +533,135 @@ _err: return code; } -static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { +static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { + int32_t code = 0; + SBlockData* pBlockData = &pWriter->bData; + int32_t iRow = 0; + TSDBROW row; + TSDBROW* pRow = &row; + + // correct schema + code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData); + if (code) goto _err; + + // loop to merge + *pRow = tsdbRowFromBlockData(pBlockData, iRow); + while (true) { + if (pRow == NULL) break; + + if (pWriter->pBlockData) { + ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); + + int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); + + ASSERT(c); + + if (c < 0) { + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + if (code) goto _err; + + iRow++; + if (iRow < pWriter->pBlockData->nRow) { + *pRow = tsdbRowFromBlockData(pBlockData, iRow); + } else { + pRow = NULL; + } + } else if (c > 0) { + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL); + if (code) goto _err; + + pWriter->iRow++; + if (pWriter->iRow >= pWriter->pBlockData->nRow) { + pWriter->pBlockData = NULL; + } + } + } else { + TSDBKEY key = TSDBROW_KEY(pRow); + + while (true) { + if (pWriter->iBlock >= pWriter->mBlock.nItem) break; + + SBlock block; + int32_t c; + + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + + if (block.last) { + pWriter->pBlockData = &pWriter->bDataR; + + code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); + if (code) goto _err; + pWriter->iRow = 0; + + pWriter->iBlock++; + break; + } + + c = tsdbKeyCmprFn(&block.maxKey, &key); + + ASSERT(c); + + if (c > 0) break; + + if (pWriter->bDataW.nRow) { + pWriter->blockW.last = 0; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); + } + + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + if (code) goto _err; + + pWriter->iBlock++; + } + + if (pWriter->pBlockData) continue; + + code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); + if (code) goto _err; + + iRow++; + if (iRow < pBlockData->nRow) { + *pRow = tsdbRowFromBlockData(pBlockData, iRow); + } else { + pRow = NULL; + } + } + + _check_write: + if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; + + _write_block: + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW, + pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); + } + + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { int32_t code = 0; SBlockData* pBlockData = &pWriter->bData; TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); TSDBKEY keyLast = tBlockDataLastKey(pBlockData); - // TABLE ==================================== - // end last table write if should if (pWriter->pBlockIdxW) { int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); @@ -569,23 +680,33 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { // start new table data write if need if (pWriter->pBlockIdxW == NULL) { // write table data ahead - code = tsdbSnapWriteTableDataAhead(pWriter, id); - if (code) goto _err; + while (true) { + if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; + + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); + int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + + if (c >= 0) break; + + code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx); + if (code) goto _err; + + pWriter->iBlockIdx++; + } // reader if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { ASSERT(pWriter->pDataFReader); - pWriter->pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdx, &id); - if (c) { - ASSERT(c > 0); - pWriter->pBlockIdx = NULL; - } else { + SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlock); + int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); + + ASSERT(c >= 0); + + if (c == 0) { + pWriter->pBlockIdx = pBlockIdx; pWriter->iBlockIdx++; } - } else { - pWriter->pBlockIdx = NULL; } if (pWriter->pBlockIdx) { @@ -611,107 +732,8 @@ static int32_t tsdbSnapWriteDataImpl(STsdbSnapWriter* pWriter, TABLEID id) { ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); - // BLOCK ==================================== - int32_t iRow = 0; - TSDBROW* pRow = &tsdbRowFromBlockData(pBlockData, iRow); - while (true) { - if (pRow == NULL) break; - - if (pWriter->pBlockData) { - ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); - - int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); - - ASSERT(c); - - if (c < 0) { - code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); - if (code) goto _err; - - iRow++; - if (iRow < pWriter->pBlockData->nRow) { - pRow = &tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } - } else if (c > 0) { - code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL); - if (code) goto _err; - - pWriter->iRow++; - if (pWriter->iRow >= pWriter->pBlockData->nRow) { - pWriter->pBlockData = NULL; - } - } - } else { - SBlock block; - - if (pWriter->iBlock < pWriter->mBlock.nItem) { - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); - int32_t c; - - c = tsdbKeyCmprFn(&block.maxKey, &TSDBROW_KEY(pRow)); - - ASSERT(c); - - if (c < 0) { - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); - if (code) goto _err; - - pWriter->iBlock++; - continue; - } - - c = tsdbKeyCmprFn(&block.minKey, &TSDBROW_KEY(pRow)); - - ASSERT(c); - - if (c > 0) { - code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); - if (code) goto _err; - - iRow++; - if (iRow < pWriter->pBlockData->nRow) { - pRow = &tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } - - goto _check_write; - } - - pWriter->pBlockData = &pWriter->bDataR; - code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); - if (code) goto _err; - - pWriter->iRow = 0; - } - - code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); - if (code) goto _err; - - iRow++; - if (iRow < pWriter->pBlockData->nRow) { - pRow = &tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } - } - - _check_write: - if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; - - _write_block: - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdx, &pWriter->blockW, - pWriter->cmprAlg); - if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); - if (code) goto _err; - - tBlockReset(&pWriter->blockW); - tBlockDataClearData(&pWriter->bDataW); - } + code = tsdbSnapWriteTableDataImpl(pWriter); + if (code) goto _err; _exit: return code; @@ -723,11 +745,10 @@ _err: } static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - TABLEID id = *(TABLEID*)(&pHdr[1]); - int64_t n; + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); + int64_t n; // decode SBlockData* pBlockData = &pWriter->bData; @@ -742,13 +763,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { // end last file data write if need - code = tsdbSnapWriteDataEnd(pWriter); // todo + code = tsdbSnapWriteDataEnd(pWriter); if (code) goto _err; pWriter->fid = fid; // read - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); // todo: check nState is valid + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); if (pSet) { code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); if (code) goto _err; @@ -763,8 +784,9 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 pWriter->pBlockIdx = NULL; tMapDataReset(&pWriter->mBlock); pWriter->iBlock = 0; - tBlockDataReset(&pWriter->bDataR); + pWriter->pBlockData = NULL; pWriter->iRow = 0; + tBlockDataReset(&pWriter->bDataR); // write SDFileSet wSet; @@ -789,12 +811,12 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 if (code) goto _err; taosArrayClear(pWriter->aBlockIdxW); - pWriter->pBlockIdxW = NULL; tMapDataReset(&pWriter->mBlockW); + pWriter->pBlockIdxW = NULL; tBlockDataReset(&pWriter->bDataW); } - code = tsdbSnapWriteDataImpl(pWriter, id); + code = tsdbSnapWriteTableData(pWriter, id); if (code) goto _err; tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d",