From 989159f0ba4510c2a07eb1dfad0796b9f2854269 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 15 Jul 2022 13:02:24 +0000 Subject: [PATCH] vnod snapshot done --- source/dnode/vnode/src/tsdb/tsdbRead.c | 23 ++- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 184 +++++++++++++++------ 2 files changed, 145 insertions(+), 62 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f0aea0cefb..7b4127e591 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo { } SBlockLoadSuppInfo; typedef struct SFilesetIter { - int32_t numOfFiles; // number of total files - int32_t index; // current accessed index in the list - SArray* pFileList; // data file list - int32_t order; + int32_t numOfFiles; // number of total files + int32_t index; // current accessed index in the list + SArray* pFileList; // data file list + int32_t order; } SFilesetIter; typedef struct SFileDataBlockInfo { @@ -830,8 +830,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, - pBlockData, NULL, NULL); + int32_t code = tsdbReadColData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pSupInfo->colIds, numOfCols, + pBlockData, NULL, NULL); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1991,7 +1991,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { SReaderStatus* pStatus = &pReader->status; - SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); + SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx)); while (1) { bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader); @@ -3006,14 +3006,14 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { - tBlockDataClear(&pStatus->fileBlockData); + tBlockDataClear(&pStatus->fileBlockData, 1); terrno = code; return NULL; } copyBlockDataToSDataBlock(pReader, pBlockScanInfo); - tBlockDataClear(&pStatus->fileBlockData); + tBlockDataClear(&pStatus->fileBlockData, 1); return pReader->pResBlock->pDataBlock; } @@ -3132,8 +3132,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa hasNext = (pBlockIter->numOfBlocks > 0); } -// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, -// pReader->pFileGroup->fid, pReader->idStr); + // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, + // pReader->pFileGroup->fid, pReader->idStr); } return code; @@ -3204,4 +3204,3 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return TSDB_CODE_SUCCESS; } - diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 932cf9f9ae..e561bb9d22 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -411,30 +411,6 @@ static int32_t tsdbSnapCommit(STsdbSnapWriter* pWriter) { return code; } -static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - - if (pWriter->pDataFWriter == NULL) goto _exit; - - // TODO - - code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 0); - if (code) goto _err; - - if (pWriter->pDataFReader) { - code = tsdbDataFReaderClose(&pWriter->pDataFReader); - if (code) goto _err; - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d tsdb snapshot writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - return code; -} - static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; int32_t iRow = 0; // todo @@ -456,14 +432,41 @@ _err: static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; - ASSERT(pWriter->pBlockIdxW != NULL); + ASSERT(pWriter->pDataFWriter); + + if (pWriter->pBlockIdxW == NULL) goto _exit; + + // consume remain rows + if (pWriter->pBlockData) { + ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); + while (pWriter->iRow < pWriter->pBlockData->nRow) { + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL); + if (code) goto _err; + + if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { + pWriter->blockW.last = 0; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); + } + + pWriter->iRow++; + } + } // write remain data if has if (pWriter->bDataW.nRow > 0) { - if (pWriter->bDataW.nRow >= pWriter->minRow) { - pWriter->blockW.last = 0; - } else { - pWriter->blockW.last = 1; + pWriter->blockW.last = 0; + if (pWriter->bDataW.nRow < pWriter->minRow) { + if (pWriter->iBlock > pWriter->mBlock.nItem) { + pWriter->blockW.last = 1; + } } code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, @@ -474,14 +477,40 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { if (code) goto _err; } + while (true) { + if (pWriter->iBlock >= pWriter->mBlock.nItem) break; + + SBlock block; + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + + if (block.last) { + code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); + if (code) goto _err; + + tBlockReset(&block); + block.last = 1; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block, + pWriter->cmprAlg); + if (code) goto _err; + } + + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + if (code) goto _err; + + pWriter->iBlock++; + } + + // SBlock code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); if (code) goto _err; + // SBlockIdx if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } +_exit: return code; _err: @@ -601,25 +630,39 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { ASSERT(c); - if (c > 0) break; + if (c < 0) { + if (pWriter->bDataW.nRow) { + pWriter->blockW.last = 0; + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + &pWriter->blockW, pWriter->cmprAlg); + if (code) goto _err; - if (pWriter->bDataW.nRow) { - pWriter->blockW.last = 0; - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - &pWriter->blockW, pWriter->cmprAlg); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + if (code) goto _err; + + tBlockReset(&pWriter->blockW); + tBlockDataClearData(&pWriter->bDataW); + } + + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); - if (code) goto _err; + pWriter->iBlock++; + } else { + c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey); - tBlockReset(&pWriter->blockW); - tBlockDataClearData(&pWriter->bDataW); + ASSERT(c); + + if (c < 0) break; + + pWriter->pBlockData = &pWriter->bDataR; + code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, NULL); + if (code) goto _err; + pWriter->iRow = 0; + + pWriter->iBlock++; + break; } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); - if (code) goto _err; - - pWriter->iBlock++; } if (pWriter->pBlockData) continue; @@ -744,6 +787,45 @@ _err: return code; } +static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + + if (pWriter->pDataFWriter == NULL) goto _exit; + + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + + while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { + code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx)); + if (code) goto _err; + + pWriter->iBlockIdx++; + } + + code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW, NULL); + if (code) goto _err; + + code = tsdbFSStateUpsertDFileSet(pTsdb->fs->nState, tsdbDataFWriterGetWSet(pWriter->pDataFWriter)); + if (code) goto _err; + + code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); + if (code) goto _err; + + if (pWriter->pDataFReader) { + code = tsdbDataFReaderClose(&pWriter->pDataFReader); + if (code) goto _err; + } + +_exit: + tsdbError("vgId:%d vnode snapshot tsdb writer data end", TD_VID(pTsdb->pVnode)); + return code; + +_err: + tsdbError("vgId:%d vnode snapshot tsdb writer data end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; @@ -845,23 +927,23 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 } // writer - SDelFile delFile = {.commitID = pTsdb->pVnode->state.commitID, .offset = 0, .size = 0}; + SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0}; code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); if (code) goto _err; } // process the del data - TABLEID id = {0}; // todo + TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); while (true) { SDelIdx* pDelIdx = NULL; - int64_t n = 0; + int64_t n = sizeof(SSnapDataHdr) + sizeof(TABLEID); SDelData delData; SDelIdx delIdx; int8_t toBreak = 0; if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) { - pDelIdx = taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); + pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); } if (pDelIdx) { @@ -914,7 +996,7 @@ _exit: return code; _err: - tsdbError("vgId:%d tsdb snapshot write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write del failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -923,6 +1005,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { STsdb* pTsdb = pWriter->pTsdb; if (pWriter->pDelFWriter == NULL) goto _exit; + for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) { SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); @@ -954,10 +1037,11 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { } _exit: + tsdbInfo("vgId:%d vnode snapshot tsdb write del end", TD_VID(pTsdb->pVnode)); return code; _err: - tsdbError("vgId:%d tsdb snapshow write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d vnode snapshot tsdb write del end failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -1076,7 +1160,7 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) // del data if (pHdr->type == 2) { - code = tsdbSnapWriteDel(pWriter, pData + 1, nData - 1); + code = tsdbSnapWriteDel(pWriter, pData, nData); if (code) goto _err; }