From a32c147b3b2bda16abc16b3425fb4d44435e2733 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 13 Jul 2022 09:55:55 +0000 Subject: [PATCH] more vnode snapshot writer --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 11 +- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 169 ++++++++++++++------- 2 files changed, 115 insertions(+), 65 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index ce14a92734..470bff1eae 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -284,16 +284,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { .fLast = {.commitID = pCommitter->commitID, .size = 0}, .fSma = pRSet->fSma}; } else { - STfs *pTfs = pTsdb->pVnode->pTfs; - SDiskID did = {.level = 0, .id = 0}; - - // TODO: alloc a new disk - // tfsAllocDisk(pTfs, 0, &did); - - // create the directory - tfsMkdirRecurAt(pTfs, pTsdb->path, did); - - wSet = (SDFileSet){.diskId = did, + wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0}, .fid = pCommitter->commitFid, .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, .fData = {.commitID = pCommitter->commitID, .size = 0}, diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 563458ad85..193f4997b0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -365,36 +365,39 @@ struct STsdbSnapWriter { int32_t minRow; int32_t maxRow; int8_t cmprAlg; + int64_t commitID; // for data file + SBlockData bData; + int32_t fid; SDataFReader* pDataFReader; - SArray* aBlockIdx; + SArray* aBlockIdx; // SArray int32_t iBlockIdx; SBlockIdx* pBlockIdx; - SMapData mBlock; + SMapData mBlock; // SMapData int32_t iBlock; SBlock* pBlock; SBlock block; - SBlockData blockData; + SBlockData bDataR; int32_t iRow; SDataFWriter* pDataFWriter; - SArray* aBlockIdxN; - SBlockIdx* pBlockIdxN; + SArray* aBlockIdxW; // SArray + SBlockIdx* pBlockIdxW; SBlockIdx blockIdx; - SMapData mBlockN; - SBlock* pBlockN; - SBlock blockN; - SBlockData nBlockData; + SMapData mBlockW; // SMapData + SBlock* pBlockW; + SBlock blockW; + SBlockData bDataW; // for del file SDelFReader* pDelFReader; SDelFWriter* pDelFWriter; int32_t iDelIdx; - SArray* aDelIdx; + SArray* aDelIdxR; SArray* aDelData; - SArray* aDelIdxN; + SArray* aDelIdxW; }; static int32_t tsdbSnapRollback(STsdbSnapWriter* pWriter) { @@ -440,7 +443,7 @@ static int32_t tsdbSnapWriteAppendData(STsdbSnapWriter* pWriter, uint8_t* pData, SBlockData* pBlockData = NULL; // todo while (iRow < nRow) { - code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL); + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL); if (code) goto _err; } @@ -478,14 +481,14 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, if (pWriter->pBlockIdx == NULL || tTABLEIDCmprFn(&id, pWriter->pBlockIdx) < 0) { int32_t c; - if (pWriter->pBlockIdxN && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxN)) != 0)) { + if (pWriter->pBlockIdxW && ((c = tTABLEIDCmprFn(&id, pWriter->pBlockIdxW)) != 0)) { ASSERT(c > 0); code = tsdbSnapWriteTableDataEnd(pWriter); if (code) goto _err; } - if (pWriter->pBlockIdxN == NULL) { + if (pWriter->pBlockIdxW == NULL) { pWriter->pBlockIdx = &pWriter->blockIdx; pWriter->pBlockIdx->suid = id.suid; pWriter->pBlockIdx->uid = id.uid; @@ -496,12 +499,12 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, int32_t nRow = 0; // todo SBlockData* pBlockData = NULL; // todo for (int32_t iRow = 0; iRow < nRow; iRow++) { - code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL); + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL); if (code) goto _err; - if (pWriter->nBlockData.nRow > pWriter->maxRow * 4 / 5) { - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN, - pWriter->pBlockN, pWriter->cmprAlg); + if (pWriter->bDataW.nRow > pWriter->maxRow * 4 / 5) { + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + pWriter->pBlockW, pWriter->cmprAlg); if (code) goto _err; } } @@ -512,7 +515,7 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, if (pWriter->pBlock->last) break; if (tBlockCmprFn(&(SBlock){.minKey = {0}, .maxKey = {0}}, pWriter->pBlock) >= 0) break; - code = tMapDataPutItem(&pWriter->mBlockN, pWriter->pBlock, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, pWriter->pBlock, tPutBlock); if (code) goto _err; } @@ -533,15 +536,15 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, uint8_t* pData, SBlockData* pBlockData = NULL; for (int32_t iRow = 0; iRow < nRow; iRow++) { - code = tBlockDataAppendRow(&pWriter->nBlockData, &tsdbRowFromBlockData(pBlockData, iRow), NULL); + code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pBlockData, iRow), NULL); if (code) goto _err; - if (pWriter->nBlockData.nRow >= pWriter->maxRow * 4 / 5) { - code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->nBlockData, NULL, NULL, pWriter->pBlockIdxN, - pWriter->pBlockN, pWriter->cmprAlg); + if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { + code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, + pWriter->pBlockW, pWriter->cmprAlg); if (code) goto _err; - tBlockDataClearData(&pWriter->nBlockData); + tBlockDataClearData(&pWriter->bDataW); } } } @@ -560,60 +563,77 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; TABLEID id = *(TABLEID*)(&pHdr[1]); int64_t n; - SBlockData bData = {0}; - SBlockData* pBlockData = &bData; // decode - code = tBlockDataInit(pBlockData); - if (code) goto _err; + SBlockData* pBlockData = &pWriter->bData; n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData); ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData); -#if 0 - int32_t fid = tsdbKeyFid(skey, pWriter->minutes, pWriter->precision); - ASSERT(fid == tsdbKeyFid(ekey, pWriter->minutes, pWriter->precision)); + // open file + TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); + TSDBKEY keyLast = tBlockDataLastKey(pBlockData); - // begin + int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision); + ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { - code = tsdbSnapWriteDataEnd(pWriter); + code = tsdbSnapWriteDataEnd(pWriter); // todo if (code) goto _err; pWriter->fid = fid; - SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); - // reader + + // read + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, fid, TD_EQ); // todo: check nState is valid if (pSet) { - // open code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); if (code) goto _err; - // SBlockIdx code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx, NULL); if (code) goto _err; } else { + ASSERT(pWriter->pDataFReader == NULL); taosArrayClear(pWriter->aBlockIdx); } pWriter->iBlockIdx = 0; + pWriter->pBlockIdx = NULL; + tMapDataReset(&pWriter->mBlock); + pWriter->iBlock = 0; + pWriter->pBlock = NULL; + tBlockDataReset(&pWriter->bDataR); + pWriter->iRow = 0; - // writer - SDFileSet wSet = {0}; - if (pSet == NULL) { - wSet = (SDFileSet){0}; // todo + // write + SDFileSet wSet; + + if (pSet) { + wSet = (SDFileSet){.diskId = pSet->diskId, + .fid = fid, + .fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0}, + .fData = pSet->fData, + .fLast = {.commitID = pWriter->commitID, .size = 0}, + .fSma = pSet->fSma}; } else { - wSet = (SDFileSet){0}; // todo + wSet = (SDFileSet){.diskId = (SDiskID){.level = 0, .id = 0}, + .fid = fid, + .fHead = {.commitID = pWriter->commitID, .offset = 0, .size = 0}, + .fData = {.commitID = pWriter->commitID, .size = 0}, + .fLast = {.commitID = pWriter->commitID, .size = 0}, + .fSma = {.commitID = pWriter->commitID, .size = 0}}; } code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); if (code) goto _err; - taosArrayClear(pWriter->aBlockIdxN); + taosArrayClear(pWriter->aBlockIdxW); + pWriter->pBlockIdxW = NULL; + tMapDataReset(&pWriter->mBlockW); + pWriter->pBlockW = NULL; + tBlockDataReset(&pWriter->bDataW); } - code = tsdbSnapWriteTableData(pWriter, pData, nData); - if (code) goto _err; -#endif + // write data block (todo) - tsdbInfo("vgId:%d vnode snapshot tsdb write data, suid:%" PRId64 " uid:%" PRId64 " nRow:%d", TD_VID(pTsdb->pVnode), - id.suid, id.suid, pBlockData->nRow); + tsdbInfo("vgId:%d vnode snapshot tsdb write data, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", + TD_VID(pTsdb->pVnode), fid, id.suid, id.suid, pBlockData->nRow); return code; _err: @@ -633,7 +653,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL); if (code) goto _err; - code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdx, NULL); + code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL); if (code) goto _err; } @@ -653,8 +673,8 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 SDelIdx delIdx; int8_t toBreak = 0; - if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx)) { - pDelIdx = taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx); + if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR)) { + pDelIdx = taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); } if (pDelIdx) { @@ -695,7 +715,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx); if (code) goto _err; - if (taosArrayPush(pWriter->aDelIdxN, &delIdx) == NULL) { + if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -716,8 +736,8 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { STsdb* pTsdb = pWriter->pTsdb; if (pWriter->pDelFWriter == NULL) goto _exit; - for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdx); pWriter->iDelIdx++) { - SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdx, pWriter->iDelIdx); + for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) { + SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, NULL); if (code) goto _err; @@ -726,7 +746,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, NULL, &delIdx); if (code) goto _err; - if (taosArrayPush(pWriter->aDelIdx, &delIdx) == NULL) { + if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -768,11 +788,50 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->sver = sver; pWriter->ever = ever; + // config pWriter->minutes = pTsdb->keepCfg.days; pWriter->precision = pTsdb->keepCfg.precision; pWriter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pWriter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + pWriter->commitID = pTsdb->pVnode->state.commitID; + + // for data file + code = tBlockDataInit(&pWriter->bData); + + if (code) goto _err; + pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tBlockDataInit(&pWriter->bDataR); + if (code) goto _err; + + pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->aBlockIdxW == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tBlockDataInit(&pWriter->bDataW); + if (code) goto _err; + + // for del file + pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); + if (pWriter->aDelIdxR == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->aDelData = taosArrayInit(0, sizeof(SDelData)); + if (pWriter->aDelData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->aDelIdxW = taosArrayInit(0, sizeof(SDelIdx)); + if (pWriter->aDelIdxW == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } *ppWriter = pWriter; return code;