From 255f42e07f1d08b54409c324ed935796fb148c11 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 8 Sep 2022 15:13:58 +0800 Subject: [PATCH] more tsdb snapshot --- source/dnode/vnode/src/inc/tsdb.h | 2 + source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 652 +++++---------------- 2 files changed, 150 insertions(+), 504 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a177527dab..38b5bd24be 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -84,6 +84,8 @@ typedef struct SLDataIter SLDataIter; #define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN}) #define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX}) +#define TABLE_SAME_SCHEMA(SUID1, UID1, SUID2, UID2) ((SUID1) ? (SUID1) == (SUID2) : (UID1) == (UID2)) + #define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM)) #define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \ ((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE)) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index fe6e9da15f..502d227121 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -616,11 +616,12 @@ struct STsdbSnapWriter { int8_t cmprAlg; int64_t commitID; uint8_t* aBuf[5]; + // for data file SBlockData bData; - - int32_t fid; - TABLEID id; + int32_t fid; + TABLEID id; + SSkmInfo skmTable; struct { SDataFReader* pReader; SArray* aBlockIdx; @@ -639,7 +640,6 @@ struct STsdbSnapWriter { SBlockData bData; SBlockData sData; } dWriter; - SSkmInfo skmTable; // for del file SDelFReader* pDelFReader; @@ -651,448 +651,28 @@ struct STsdbSnapWriter { }; // SNAP_DATA_TSDB -#if 0 -static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { - int32_t code = 0; - - ASSERT(pWriter->pDataFWriter); - - if (pWriter->pBlockIdxW == NULL) goto _exit; - - // consume remain rows - if (pWriter->pBlockData) { - ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); - while (pWriter->iRow < pWriter->pBlockData->nRow) { - code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL, - 0); // todo - if (code) goto _err; - - if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { - // pWriter->blockW.last = 0; - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); - if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - - tDataBlkReset(&pWriter->blockW); - tBlockDataClear(&pWriter->bDataW); - } - - pWriter->iRow++; - } - } - - // write remain data if has - if (pWriter->bDataW.nRow > 0) { - // pWriter->blockW.last = 0; - if (pWriter->bDataW.nRow < pWriter->minRow) { - if (pWriter->iBlock > pWriter->mBlock.nItem) { - // pWriter->blockW.last = 1; - } - } - - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); - // if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - } - - while (true) { - if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - - SDataBlk block; - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); - - // if (block.last) { - // code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); - // if (code) goto _err; - - // tBlockReset(&block); - // block.last = 1; - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block, - // pWriter->cmprAlg); - // if (code) goto _err; - // } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); - if (code) goto _err; - - pWriter->iBlock++; - } - - // SDataBlk - // code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); - // if (code) goto _err; - - // SBlockIdx - if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - -_exit: - tsdbInfo("vgId:%d, tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); - return code; - -_err: - tsdbError("vgId:%d, tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); - return code; -} - -static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { - int32_t code = 0; - - code = tsdbReadDataBlk(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock); - if (code) goto _err; - - // SBlockData - SDataBlk block; - tMapDataReset(&pWriter->mBlockW); - for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { - tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk); - - // if (block.last) { - // code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); - // if (code) goto _err; - - // tBlockReset(&block); - // block.last = 1; - // code = - // tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, - // pWriter->cmprAlg); - // if (code) goto _err; - // } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); - if (code) goto _err; - } - - // SDataBlk - SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - code = tsdbWriteDataBlk(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx); - if (code) goto _err; - - // SBlockIdx - if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d, tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); - return code; -} - -static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { - int32_t code = 0; - SBlockData* pBlockData = &pWriter->bData; - int32_t iRow = 0; - TSDBROW row; - TSDBROW* pRow = &row; - - // // correct schema - // code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData); - // if (code) goto _err; - - // loop to merge - *pRow = tsdbRowFromBlockData(pBlockData, iRow); - while (true) { - if (pRow == NULL) break; - - if (pWriter->pBlockData) { - ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); - - int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); - - ASSERT(c); - - if (c < 0) { - // code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); - // if (code) goto _err; - - iRow++; - if (iRow < pWriter->pBlockData->nRow) { - *pRow = tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } - } else if (c > 0) { - // code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), - // NULL); if (code) goto _err; - - pWriter->iRow++; - if (pWriter->iRow >= pWriter->pBlockData->nRow) { - pWriter->pBlockData = NULL; - } - } - } else { - TSDBKEY key = TSDBROW_KEY(pRow); - - while (true) { - if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - - SDataBlk block; - int32_t c; - - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); - - // if (block.last) { - // pWriter->pBlockData = &pWriter->bDataR; - - // code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, - // NULL); if (code) goto _err; pWriter->iRow = 0; - - // pWriter->iBlock++; - // break; - // } - - c = tsdbKeyCmprFn(&block.maxKey, &key); - - ASSERT(c); - - if (c < 0) { - if (pWriter->bDataW.nRow) { - // pWriter->blockW.last = 0; - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); - // if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - - tDataBlkReset(&pWriter->blockW); - tBlockDataClear(&pWriter->bDataW); - } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); - if (code) goto _err; - - pWriter->iBlock++; - } else { - c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey); - - ASSERT(c); - - if (c > 0) { - pWriter->pBlockData = &pWriter->bDataR; - // code = - // tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, - // NULL); - // if (code) goto _err; - pWriter->iRow = 0; - - pWriter->iBlock++; - } - break; - } - } - - if (pWriter->pBlockData) continue; - - // code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); - // if (code) goto _err; - - iRow++; - if (iRow < pBlockData->nRow) { - *pRow = tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } - } - - _check_write: - if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; - - _write_block: - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); - // if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - - tDataBlkReset(&pWriter->blockW); - tBlockDataClear(&pWriter->bDataW); - } - - return code; - -_err: - tsdbError("vgId:%d, vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); - return code; -} - -static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { - int32_t code = 0; - SBlockData* pBlockData = &pWriter->bData; - TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); - TSDBKEY keyLast = tBlockDataLastKey(pBlockData); - - // end last table write if should - if (pWriter->pBlockIdxW) { - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); - if (c < 0) { - // end - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; - - // reset - pWriter->pBlockIdxW = NULL; - } else if (c > 0) { - ASSERT(0); - } - } - - // start new table data write if need - if (pWriter->pBlockIdxW == NULL) { - // write table data ahead - while (true) { - if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; - - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); - - if (c >= 0) break; - - code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx); - if (code) goto _err; - - pWriter->iBlockIdx++; - } - - // reader - pWriter->pBlockIdx = NULL; - if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - ASSERT(pWriter->pDataFReader); - - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); - - ASSERT(c >= 0); - - if (c == 0) { - pWriter->pBlockIdx = pBlockIdx; - pWriter->iBlockIdx++; - } - } - - if (pWriter->pBlockIdx) { - code = tsdbReadDataBlk(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock); - if (code) goto _err; - } else { - tMapDataReset(&pWriter->mBlock); - } - pWriter->iBlock = 0; - pWriter->pBlockData = NULL; - pWriter->iRow = 0; - - // writer - pWriter->pBlockIdxW = &pWriter->blockIdxW; - pWriter->pBlockIdxW->suid = id.suid; - pWriter->pBlockIdxW->uid = id.uid; - - tDataBlkReset(&pWriter->blockW); - tBlockDataReset(&pWriter->bDataW); - tMapDataReset(&pWriter->mBlockW); - } - - ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); - ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); - - code = tsdbSnapWriteTableDataImpl(pWriter); - if (code) goto _err; - -_exit: - tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path); - return code; - -_err: - tsdbError("vgId:%d, vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); - return code; -} - -static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - - if (pWriter->pDataFWriter == NULL) goto _exit; - - // finish current table - code = tsdbSnapWriteTableDataEnd(pWriter); - if (code) goto _err; - - // move remain table - while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx)); - if (code) goto _err; - - pWriter->iBlockIdx++; - } - - // write remain stuff - if (taosArrayGetSize(pWriter->aBlockLW) > 0) { - code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW); - if (code) goto _err; - } - - if (taosArrayGetSize(pWriter->aBlockIdx) > 0) { - code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW); - if (code) goto _err; - } - - code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); - if (code) goto _err; - - code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); - if (code) goto _err; - - if (pWriter->pDataFReader) { - code = tsdbDataFReaderClose(&pWriter->pDataFReader); - if (code) goto _err; - } - -_exit: - tsdbInfo("vgId:%d, vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path); - return code; - -_err: - tsdbError("vgId:%d, vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, - tstrerror(code)); - return code; -} -#endif - extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg); static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { int32_t code = 0; - pWriter->dReader.iBlockIdx++; + ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow); + if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) { pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx); code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk); if (code) goto _exit; - pWriter->dReader.iDataBlk = -1; - tBlockDataReset(&pWriter->dReader.bData); - pWriter->dReader.iRow = 0; + pWriter->dReader.iBlockIdx++; } else { pWriter->dReader.pBlockIdx = NULL; + tMapDataReset(&pWriter->dReader.mDataBlk); } + pWriter->dReader.iDataBlk = 0; // point to the next one + tBlockDataReset(&pWriter->dReader.bData); + pWriter->dReader.iRow = 0; _exit: return code; @@ -1134,10 +714,6 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); if (code) goto _err; - // Reader (todo) - ASSERT(pWriter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0); - - // Writer tMapDataReset(&pWriter->dWriter.mDataBlk); code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema); if (code) goto _err; @@ -1145,6 +721,7 @@ static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pI return code; _err: + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); return code; } @@ -1153,22 +730,28 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code; - if (pWriter->dReader.pBlockIdx && pWriter->dReader.pBlockIdx->suid == pWriter->id.suid && - pWriter->dReader.pBlockIdx->uid == pWriter->id.uid) { + int32_t c = 1; + if (pWriter->dReader.pBlockIdx) { + c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id); + ASSERT(c >= 0); + } + + if (c == 0) { + SBlockData* pBData = &pWriter->dWriter.bData; + for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); - code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->id.uid); + + code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid); if (code) goto _err; - if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { - code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, - pWriter->cmprAlg); + if (pBData->nRow >= pWriter->maxRow) { + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg); if (code) goto _err; } } - code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, - pWriter->cmprAlg); + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg); if (code) goto _err; for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { @@ -1183,14 +766,9 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { if (code) goto _err; } - // code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, - // pWriter->cmprAlg); - // if (code) goto _err; - if (pWriter->dWriter.mDataBlk.nItem) { SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid}; code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx); - if (code) goto _err; if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1213,11 +791,11 @@ static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { ASSERT(pWriter->dWriter.pWriter == NULL); - // open new pWriter->fid = fid; + pWriter->id = (TABLEID){0}; SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); - // open reader + // Reader if (pSet) { code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet); if (code) goto _err; @@ -1225,10 +803,14 @@ static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx); if (code) goto _err; } else { - // TODO + ASSERT(pWriter->dReader.pReader == NULL); + taosArrayClear(pWriter->dReader.aBlockIdx); } + pWriter->dReader.iBlockIdx = 0; // point to the next one + code = tsdbSnapNextTableData(pWriter); + if (code) goto _err; - // open writer + // Writer SHeadFile fHead = {.commitID = pWriter->commitID}; SDataFile fData = {.commitID = pWriter->commitID}; SSmaFile fSma = {.commitID = pWriter->commitID}; @@ -1250,11 +832,14 @@ static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { wSet.nSttF = 1; } wSet.aSttF[wSet.nSttF - 1] = &fStt; + code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet); if (code) goto _err; taosArrayClear(pWriter->dWriter.aBlockIdx); tMapDataReset(&pWriter->dWriter.mDataBlk); taosArrayClear(pWriter->dWriter.aSttBlk); + tBlockDataReset(&pWriter->dWriter.bData); + tBlockDataReset(&pWriter->dWriter.sData); return code; @@ -1307,21 +892,24 @@ _err: return code; } -static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) { +static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) { int32_t code = 0; - SBlockData* pBlockData = &pWriter->bData; - TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; - TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); + SBlockData* pBData = &pWriter->bData; + TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]}; + TSDBROW row = tsdbRowFromBlockData(pBData, iRow); TSDBKEY key = TSDBROW_KEY(&row); - if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { - _merge_block: - // merge with data block in row + *done = 0; + while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow || + pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) { + // Merge row by row for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); TSDBKEY tKey = TSDBROW_KEY(&trow); + ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid); + int32_t c = tsdbKeyCmprFn(&key, &tKey); if (c < 0) { code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); @@ -1345,19 +933,21 @@ static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iR } } - // merge with dataBlk in whole + // Merge row by block SDataBlk tDataBlk = {.minKey = key, .maxKey = key}; - for (pWriter->dReader.iBlockIdx++; pWriter->dReader.iBlockIdx < pWriter->dReader.mDataBlk.nItem; - pWriter->dReader.iBlockIdx++) { + for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { SDataBlk dataBlk; - tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iBlockIdx, &dataBlk, tGetDataBlk); + tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk); - if (c < 0) { + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + pWriter->cmprAlg); + if (code) goto _err; + code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); if (code) goto _err; - } else if (c < 0) { + } else if (c > 0) { code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); if (code) goto _err; @@ -1372,35 +962,53 @@ static int32_t tsdbSnapWriteWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iR } else { code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData); if (code) goto _err; + pWriter->dReader.iRow = 0; - goto _merge_block; + pWriter->dReader.iDataBlk++; + break; } } - - code = tsdbSnapNextTableData(pWriter); - if (code) goto _err; } _exit: return code; _err: + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); return code; } -static int32_t tsdbSnapWriteWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { +static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { int32_t code = 0; - SBlockData* pBlockData = &pWriter->bData; - TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; - TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); + TABLEID id = {.suid = pWriter->bData.suid, + .uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]}; + TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow); + SBlockData* pBData = &pWriter->dWriter.sData; - code = tBlockDataAppendRow(&pWriter->dWriter.sData, &row, NULL, id.uid); + if (pBData->suid || pBData->uid) { + if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) { + code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); + if (code) goto _err; + + pBData->suid = 0; + pBData->uid = 0; + } + } + + if (pBData->suid == 0 && pBData->uid == 0) { + code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable); + if (code) goto _err; + + code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema); + if (code) goto _err; + } + + code = tBlockDataAppendRow(pBData, &row, NULL, id.uid); if (code) goto _err; - if (pWriter->dWriter.sData.nRow >= pWriter->maxRow) { - code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, - pWriter->cmprAlg); + if (pBData->nRow >= pWriter->maxRow) { + code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); if (code) goto _err; } @@ -1418,7 +1026,7 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; // End last table data write if need - if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) { + if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) { code = tsdbSnapWriteTableDataEnd(pWriter); if (code) goto _err; } @@ -1431,18 +1039,22 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { // Merge with .data file data int8_t done = 0; - code = tsdbSnapWriteWriteToDataFile(pWriter, iRow, &done); - if (code) goto _err; + if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { + code = tsdbSnapWriteToDataFile(pWriter, iRow, &done); + if (code) goto _err; + } // Append to the .stt data block (todo: check if need to set/reload sst block) if (!done) { - code = tsdbSnapWriteWriteToSttFile(pWriter, iRow); + code = tsdbSnapWriteToSttFile(pWriter, iRow); if (code) goto _err; } + _exit: return code; _err: + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); return code; } @@ -1456,6 +1068,8 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf); if (code) goto _err; + ASSERT(pBlockData->nRow > 0); + // Loop to handle each row for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { TSKEY ts = pBlockData->aTSKEY[iRow]; @@ -1655,39 +1269,36 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pWriter->commitID = pTsdb->pVnode->state.commitID; - // for data file + // SNAP_DATA_TSDB code = tBlockDataCreate(&pWriter->bData); if (code) goto _err; -#if 0 - pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pWriter->aBlockIdx == NULL) { + pWriter->fid = INT32_MIN; + pWriter->id = (TABLEID){0}; + // Reader + pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->dReader.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - code = tBlockDataCreate(&pWriter->bDataR); + code = tBlockDataCreate(&pWriter->dReader.bData); if (code) goto _err; - pWriter->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pWriter->aSstBlk == NULL) { + // Writer + pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->dWriter.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - - pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); - if (pWriter->aBlockIdxW == NULL) { + pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pWriter->dWriter.aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - code = tBlockDataCreate(&pWriter->bDataW); + code = tBlockDataCreate(&pWriter->dWriter.bData); + if (code) goto _err; + code = tBlockDataCreate(&pWriter->dWriter.sData); if (code) goto _err; - - pWriter->aBlockLW = taosArrayInit(0, sizeof(SSttBlk)); - if (pWriter->aBlockLW == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } -#endif // SNAP_DATA_DEL pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); @@ -1721,14 +1332,17 @@ _err: int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; STsdbSnapWriter* pWriter = *ppWriter; + STsdb* pTsdb = pWriter->pTsdb; if (rollback) { ASSERT(0); // code = tsdbFSRollback(pWriter->pTsdb->pFS); // if (code) goto _err; } else { - // code = tsdbSnapWriteDataEnd(pWriter); - if (code) goto _err; + if (pWriter->dWriter.pWriter) { + code = tsdbSnapWriteCloseFile(pWriter); + if (code) goto _err; + } code = tsdbSnapWriteDelEnd(pWriter); if (code) goto _err; @@ -1736,14 +1350,44 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs); if (code) goto _err; + // lock + taosThreadRwlockWrlock(&pTsdb->rwLock); + code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs); - if (code) goto _err; + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _err; + } + + // unlock + taosThreadRwlockUnlock(&pTsdb->rwLock); } + // SNAP_DATA_DEL + taosArrayDestroy(pWriter->aDelIdxW); + taosArrayDestroy(pWriter->aDelData); + taosArrayDestroy(pWriter->aDelIdxR); + + // SNAP_DATA_TSDB + + // Writer + tBlockDataDestroy(&pWriter->dWriter.sData, 1); + tBlockDataDestroy(&pWriter->dWriter.bData, 1); + taosArrayDestroy(pWriter->dWriter.aSttBlk); + tMapDataClear(&pWriter->dWriter.mDataBlk); + taosArrayDestroy(pWriter->dWriter.aBlockIdx); + + // Reader + tBlockDataDestroy(&pWriter->dReader.bData, 1); + tMapDataClear(&pWriter->dReader.mDataBlk); + taosArrayDestroy(pWriter->dReader.aBlockIdx); + + tBlockDataDestroy(&pWriter->bData, 1); + tTSchemaDestroy(pWriter->skmTable.pTSchema); + for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { tFree(pWriter->aBuf[iBuf]); } - tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); taosMemoryFree(pWriter); *ppWriter = NULL; @@ -1769,7 +1413,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) goto _exit; } else { if (pWriter->dWriter.pWriter) { - // code = tsdbSnapWriteDataEnd(pWriter); + code = tsdbSnapWriteCloseFile(pWriter); if (code) goto _err; } }