diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 0adc2e4d56..fe6e9da15f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1307,7 +1307,7 @@ _err: return code; } -static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { +static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) { int32_t code = 0; SBlockData* pBlockData = &pWriter->bData; @@ -1315,19 +1315,6 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); TSDBKEY key = TSDBROW_KEY(&row); - // End last table data write if need - if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) { - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; - } - - // Start new table data write if need - if (pWriter->id.suid == 0 && pWriter->id.uid == 0) { - code = tsdbSnapWriteTableDataStart(pWriter, &id); - if (code) goto _err; - } - - // Merge with .data file data if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { _merge_block: // merge with data block in row @@ -1347,10 +1334,15 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { } if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { - // TODO: commit to the block + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + pWriter->cmprAlg); + if (code) goto _err; } - if (c < 0) goto _exit; + if (c < 0) { + *done = 1; + goto _exit; + } } // merge with dataBlk in whole @@ -1374,6 +1366,9 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { pWriter->cmprAlg); if (code) goto _err; } + + *done = 1; + goto _exit; } else { code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData); if (code) goto _err; @@ -1386,7 +1381,20 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { if (code) goto _err; } - // Append to the .stt data block (todo: check if need to set/reload sst block) +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { + int32_t code = 0; + + SBlockData* pBlockData = &pWriter->bData; + TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; + TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); + code = tBlockDataAppendRow(&pWriter->dWriter.sData, &row, NULL, id.uid); if (code) goto _err; @@ -1403,6 +1411,41 @@ _err: return code; } +static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { + int32_t code = 0; + + SBlockData* pBlockData = &pWriter->bData; + TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; + + // End last table data write if need + if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) { + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + } + + // Start new table data write if need + if (pWriter->id.suid == 0 && pWriter->id.uid == 0) { + code = tsdbSnapWriteTableDataStart(pWriter, &id); + if (code) goto _err; + } + + // Merge with .data file data + int8_t done = 0; + code = tsdbSnapWriteWriteToDataFile(pWriter, iRow, &done); + if (code) goto _err; + + // Append to the .stt data block (todo: check if need to set/reload sst block) + if (!done) { + code = tsdbSnapWriteWriteToSttFile(pWriter, iRow); + if (code) goto _err; + } +_exit: + return code; + +_err: + return code; +} + static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb;