From 416d4ac6ad2ba0cc9a4862c7cecc85b7bce7c245 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 7 Sep 2022 17:56:40 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 74 ++++++++++------------ source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 59 +++++++++++++++-- 2 files changed, 88 insertions(+), 45 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 52da3a47d2..cc4495e68f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -551,13 +551,12 @@ _err: return code; } -static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { - int32_t code = 0; - SBlockData *pBlockData = &pCommitter->dWriter.bData; - SDataBlk dataBlk; +int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) { + int32_t code = 0; - ASSERT(pBlockData->nRow > 0); + if (pBlockData->nRow == 0) return code; + SDataBlk dataBlk; tDataBlkReset(&dataBlk); // info @@ -585,13 +584,12 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { // write dataBlk.nSubBlock++; - code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1], - ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, - pCommitter->cmprAlg, 0); + code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1], + ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0); if (code) goto _err; // put SDataBlk - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &dataBlk, tPutDataBlk); + code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk); if (code) goto _err; // clear @@ -600,16 +598,15 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { return code; _err: - tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { - int32_t code = 0; - SSttBlk sstBlk; - SBlockData *pBlockData = &pCommitter->dWriter.bDatal; +int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) { + int32_t code = 0; + SSttBlk sstBlk; - ASSERT(pBlockData->nRow > 0); + if (pBlockData->nRow == 0) return code; // info sstBlk.suid = pBlockData->suid; @@ -628,11 +625,11 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; // write - code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &sstBlk.bInfo, NULL, pCommitter->cmprAlg, 1); + code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1); if (code) goto _err; // push SSttBlk - if (taosArrayPush(pCommitter->dWriter.aSttBlk, &sstBlk) == NULL) { + if (taosArrayPush(aSttBlk, &sstBlk) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -643,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { return code; _err: - tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -1120,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } if (pBlockData->nRow >= pCommitter->maxRow) { - code = tsdbCommitDataBlock(pCommitter); + code = + tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } - if (pBlockData->nRow) { - code = tsdbCommitDataBlock(pCommitter); - if (code) goto _err; - } + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); + if (code) goto _err; return code; @@ -1189,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } @@ -1206,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } - if (pBDataW->nRow) { - code = tsdbCommitDataBlock(pCommitter); - if (code) goto _err; - } + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); + if (code) goto _err; return code; @@ -1302,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { SBlockData *pBDatal = &pCommitter->dWriter.bDatal; if (pBDatal->suid || pBDatal->uid) { if ((pBDatal->suid != id.suid) || (id.suid == 0)) { - if (pBDatal->nRow) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _exit; - } + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); + if (code) goto _exit; tBlockDataReset(pBDatal); } } @@ -1337,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { if (code) goto _err; if (pBDatal->nRow >= pCommitter->maxRow) { - code = tsdbCommitLastBlock(pCommitter); + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); if (code) goto _err; } } @@ -1389,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (pBData->nRow >= pCommitter->maxRow) { if (pCommitter->toLastOnly) { - code = tsdbCommitLastBlock(pCommitter); + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); if (code) goto _err; } else { - code = tsdbCommitDataBlock(pCommitter); + code = + tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } @@ -1400,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (!pCommitter->toLastOnly && pBData->nRow) { if (pBData->nRow > pCommitter->minRow) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } else { code = tsdbAppendLastBlock(pCommitter); @@ -1466,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { code = tsdbMoveCommitData(pCommitter, id); if (code) goto _err; - if (pCommitter->dWriter.bDatal.nRow > 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, + pCommitter->cmprAlg); + if (code) goto _err; return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 081be8290d..20328a6b90 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1074,6 +1074,9 @@ _err: } #endif +extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); +extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg); + static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { int32_t code = 0; @@ -1179,19 +1182,67 @@ static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { ASSERT(pWriter->dWriter.pWriter); - // (todo) + // todo: end current commit table + for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { + TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->dReader.pBlockIdx->uid); + if (code) goto _err; + } + + for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); + + code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); + if (code) goto _err; + } + + if (pWriter->dWriter.mDataBlk.nItem) { + SBlockIdx blockIdx = *pWriter->dReader.pBlockIdx; + code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx); + if (code) goto _err; + + if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } // copy remain table data TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; code = tsdbSnapWriteCopyData(pWriter, &id); - if (code) goto _exit; + if (code) goto _err; - if (pWriter->dWriter.sData.nRow > 0) { - // TODO: write the last block + code = + tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); + if (code) goto _err; + + // Indices + code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx); + if (code) goto _err; + + code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk); + if (code) goto _err; + + code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter); + if (code) goto _err; + + code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet); + if (code) goto _err; + + code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1); + if (code) goto _err; + + if (pWriter->dReader.pReader) { + code = tsdbDataFReaderClose(&pWriter->dReader.pReader); + if (code) goto _err; } _exit: return code; + +_err: + return code; } static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) {