From fff4a96bdf94a0f8adfafca456d9c5a71b7a7286 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 Jan 2023 22:18:39 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 160 ++++++++++++--------- 1 file changed, 94 insertions(+), 66 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 5747584d26..310c3de5aa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1105,7 +1105,9 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; int32_t lino = 0; - // TODO: do write the row, refer line 1593 + // write a NULL row to end current table data write + code = tsdbSnapWriteTableRow(pWriter, NULL); + TSDB_CHECK_CODE(code, lino, _exit); if (pWriter->bData.nRow > 0) { if (pWriter->bData.nRow < pWriter->minRow) { @@ -1120,7 +1122,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { } } - tBlockDataReset(&pWriter->bData); + tBlockDataClear(&pWriter->bData); } else { code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); @@ -1134,6 +1136,9 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { TSDB_CHECK_CODE(code, lino, _exit); } + pBlockIdx->suid = pWriter->tbid.suid; + pBlockIdx->uid = pWriter->tbid.uid; + code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mDataBlk, pBlockIdx); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1247,9 +1252,14 @@ static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) { ASSERT(pWriter->pDataFWriter); - code = tsdbSnapWriteTableData(pWriter, NULL /* TODO */); + code = tsdbSnapWriteTableData(pWriter, NULL); TSDB_CHECK_CODE(code, lino, _exit); + if (pWriter->sData.nRow) { + code = tsdbWriteSttBlock(pWriter->pDataFWriter, &pWriter->sData, pWriter->aSttBlk, pWriter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + // do file-level updates code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aSttBlk); TSDB_CHECK_CODE(code, lino, _exit); @@ -1271,8 +1281,12 @@ static int32_t tsdbSnapWriteFileDataEnd(STsdbSnapWriter* pWriter) { TSDB_CHECK_CODE(code, lino, _exit); } - // TODO: do clear sources - {} + // clear sources + while (pWriter->iterList) { + STsdbDataIter2* pIter = pWriter->iterList; + pWriter->iterList = pIter->next; + tsdbCloseDataIter2(pIter); + } _exit: if (code) { @@ -1349,7 +1363,7 @@ _exit: return code; } -static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { +static int32_t tsdbSnapWriteTableRowImpl(STsdbSnapWriter* pWriter, TSDBROW* pRow) { int32_t code = 0; int32_t lino = 0; @@ -1368,6 +1382,79 @@ _exit: return code; } +static int32_t tsdbSnapWriteTableRow(STsdbSnapWriter* pWriter, TSDBROW* pRow) { + int32_t code = 0; + int32_t lino = 0; + + // TODO: take pRow == NULL into account + + if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow && + pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) { + code = tsdbSnapWriteTableRowImpl(pWriter, pRow); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + for (;;) { + while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) { + TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow); + + int32_t c = tsdbRowCmprFn(pRow, &row); + if (c < 0) { + code = tsdbSnapWriteTableRowImpl(pWriter, pRow); + TSDB_CHECK_CODE(code, lino, _exit); + goto _exit; + } else if (c > 0) { + code = tsdbSnapWriteTableRowImpl(pWriter, pRow); + TSDB_CHECK_CODE(code, lino, _exit); + + pWriter->pDIter->dIter.iRow++; + } else { + ASSERT(0); + } + } + + for (;;) { + if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) { + code = tsdbSnapWriteTableRowImpl(pWriter, pRow); + TSDB_CHECK_CODE(code, lino, _exit); + goto _exit; + } + + // FIXME: Here can be slow, use array instead + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); + + int32_t c = tDataBlkCmprFn(&dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(pRow), .maxKey = TSDBROW_KEY(pRow)}); + if (c > 0) { + code = tsdbSnapWriteTableRowImpl(pWriter, pRow); + TSDB_CHECK_CODE(code, lino, _exit); + goto _exit; + } else if (c < 0) { + if (pWriter->bData.nRow > 0) { + code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + + tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); + pWriter->pDIter->dIter.iDataBlk++; + } else { + code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); + TSDB_CHECK_CODE(code, lino, _exit); + + pWriter->pDIter->dIter.iRow = 0; + pWriter->pDIter->dIter.iDataBlk++; + break; + } + } + } + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowInfo) { int32_t code = 0; int32_t lino = 0; @@ -1384,68 +1471,9 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, SRowInfo* pRowIn } // end with a NULL row - if (pRowInfo == NULL) goto _exit; - - // do write the row - if (pWriter->pDIter == NULL || (pWriter->pDIter->dIter.iRow >= pWriter->pDIter->dIter.bData.nRow && - pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem)) { + if (pRowInfo) { code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row); TSDB_CHECK_CODE(code, lino, _exit); - } else { - for (;;) { - while (pWriter->pDIter->dIter.iRow < pWriter->pDIter->dIter.bData.nRow) { - TSDBROW row = tsdbRowFromBlockData(&pWriter->pDIter->dIter.bData, pWriter->pDIter->dIter.iRow); - - int32_t c = tsdbRowCmprFn(&pRowInfo->row, &row); - if (c < 0) { - code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row); - TSDB_CHECK_CODE(code, lino, _exit); - goto _exit; - } else if (c > 0) { - code = tsdbSnapWriteTableRow(pWriter, &row); - TSDB_CHECK_CODE(code, lino, _exit); - - pWriter->pDIter->dIter.iRow++; - } else { - ASSERT(0); - } - } - - for (;;) { - if (pWriter->pDIter->dIter.iDataBlk >= pWriter->pDIter->dIter.mDataBlk.nItem) { - code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row); - TSDB_CHECK_CODE(code, lino, _exit); - goto _exit; - } - - // FIXME: Here can be slow, use array instead - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pWriter->pDIter->dIter.mDataBlk, pWriter->pDIter->dIter.iDataBlk, &dataBlk, tGetDataBlk); - - int32_t c = tDataBlkCmprFn( - &dataBlk, &(SDataBlk){.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}); - if (c > 0) { - code = tsdbSnapWriteTableRow(pWriter, &pRowInfo->row); - TSDB_CHECK_CODE(code, lino, _exit); - goto _exit; - } else if (c < 0) { - if (pWriter->bData.nRow > 0) { - code = tsdbWriteDataBlock(pWriter->pDataFWriter, &pWriter->bData, &pWriter->mDataBlk, pWriter->cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); - } - - tMapDataPutItem(&pWriter->pDIter->dIter.mDataBlk, &dataBlk, tPutDataBlk); - pWriter->pDIter->dIter.iDataBlk++; - } else { - code = tsdbReadDataBlockEx(pWriter->pDataFReader, &dataBlk, &pWriter->pDIter->dIter.bData); - TSDB_CHECK_CODE(code, lino, _exit); - - pWriter->pDIter->dIter.iRow = 0; - pWriter->pDIter->dIter.iDataBlk++; - break; - } - } - } } _exit: