From 2d789f5ae041cbe48d664ac1f577bdd7295e2a97 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 7 Sep 2022 15:37:51 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 28 +- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 402 ++++++++++++++------- 2 files changed, 290 insertions(+), 140 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4cd1971be0..52b030c72b 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -644,19 +644,19 @@ typedef struct { } SRowInfo; typedef struct SSttBlockLoadInfo { - SBlockData blockData[2]; - SArray *aSttBlk; - int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. - int32_t currentLoadBlockIndex; + SBlockData blockData[2]; + SArray *aSttBlk; + int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. + int32_t currentLoadBlockIndex; } SSttBlockLoadInfo; typedef struct SMergeTree { - int8_t backward; - SRBTree rbt; - SArray *pIterList; - SLDataIter *pIter; - bool destroyLoadInfo; - SSttBlockLoadInfo* pLoadInfo; + int8_t backward; + SRBTree rbt; + SArray *pIterList; + SLDataIter *pIter; + bool destroyLoadInfo; + SSttBlockLoadInfo *pLoadInfo; } SMergeTree; typedef struct { @@ -666,15 +666,15 @@ typedef struct { } SSkmInfo; int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo); + STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); TSDBROW tMergeTreeGetRow(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); -SSttBlockLoadInfo* tCreateLastBlockLoadInfo(); -void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); -void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo); +SSttBlockLoadInfo *tCreateLastBlockLoadInfo(); +void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); +void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); // ========== inline functions ========== static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 336d435b30..1949b62432 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -609,40 +609,37 @@ struct STsdbSnapWriter { STsdbFS fs; // config - int32_t minutes; - int8_t precision; - int32_t minRow; - int32_t maxRow; - int8_t cmprAlg; - int64_t commitID; - + int32_t minutes; + int8_t precision; + int32_t minRow; + int32_t maxRow; + int8_t cmprAlg; + int64_t commitID; uint8_t* aBuf[5]; // for data file SBlockData bData; - int32_t fid; - SDataFReader* pDataFReader; - SArray* aBlockIdx; // SArray - int32_t iBlockIdx; - SBlockIdx* pBlockIdx; - SMapData mBlock; // SMapData - int32_t iBlock; - SBlockData* pBlockData; - int32_t iRow; - SBlockData bDataR; - SArray* aSstBlk; // SArray - int32_t iBlockL; - SBlockData lDataR; - - SDataFWriter* pDataFWriter; - SBlockIdx* pBlockIdxW; // NULL when no committing table - SDataBlk blockW; - SBlockData bDataW; - SBlockIdx blockIdxW; - - SMapData mBlockW; // SMapData - SArray* aBlockIdxW; // SArray - SArray* aBlockLW; // SArray + int32_t fid; + TABLEID id; + struct { + SDataFReader* pReader; + SArray* aBlockIdx; + int32_t iBlockIdx; + SBlockIdx* pBlockIdx; + SMapData mDataBlk; + int32_t iDataBlk; + SBlockData bData; + int32_t iRow; + } dReader; + struct { + SDataFWriter* pWriter; + SArray* aBlockIdx; + SMapData mDataBlk; + SArray* aSttBlk; + SBlockData bData; + SBlockData sData; + } dWriter; + SSkmInfo skmTable; // for del file SDelFReader* pDelFReader; @@ -653,6 +650,8 @@ struct STsdbSnapWriter { SArray* aDelIdxW; }; +// SNAP_DATA_TSDB +#if 0 static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; @@ -1073,100 +1072,247 @@ _err: tstrerror(code)); return code; } +#endif -static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); - int64_t n; +static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter); +static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; - // decode - SBlockData* pBlockData = &pWriter->bData; - code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData, - pWriter->aBuf); - if (code) goto _err; - - // open file - TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]}; - TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1], - .ts = pBlockData->aTSKEY[pBlockData->nRow - 1]}; - - int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision); - ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); - if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { - // end last file data write if need - code = tsdbSnapWriteDataEnd(pWriter); + // close last file if need + if (pWriter->dWriter.pWriter) { + ASSERT(fid > pWriter->fid); + code = tsdbSnapWriteCloseFile(pWriter); if (code) goto _err; - - pWriter->fid = fid; - - // read - SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); - if (pSet) { - code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); - if (code) goto _err; - - code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); - if (code) goto _err; - - code = tsdbReadSttBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); - if (code) goto _err; - } else { - ASSERT(pWriter->pDataFReader == NULL); - taosArrayClear(pWriter->aBlockIdx); - taosArrayClear(pWriter->aSstBlk); - } - pWriter->iBlockIdx = 0; - pWriter->pBlockIdx = NULL; - tMapDataReset(&pWriter->mBlock); - pWriter->iBlock = 0; - pWriter->pBlockData = NULL; - pWriter->iRow = 0; - pWriter->iBlockL = 0; - tBlockDataReset(&pWriter->bDataR); - tBlockDataReset(&pWriter->lDataR); - - // write - SHeadFile fHead; - SDataFile fData; - SSttFile fLast; - SSmaFile fSma; - SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSttF[0] = &fLast, .pSmaF = &fSma}; - - if (pSet) { - wSet.diskId = pSet->diskId; - wSet.fid = fid; - wSet.nSttF = 1; - fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; - fData = *pSet->pDataF; - fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0}; - fSma = *pSet->pSmaF; - } else { - wSet.diskId = (SDiskID){.level = 0, .id = 0}; - wSet.fid = fid; - wSet.nSttF = 1; - fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; - fData = (SDataFile){.commitID = pWriter->commitID, .size = 0}; - fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0, .offset = 0}; - fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0}; - } - - code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); - if (code) goto _err; - - taosArrayClear(pWriter->aBlockIdxW); - taosArrayClear(pWriter->aBlockLW); - tMapDataReset(&pWriter->mBlockW); - pWriter->pBlockIdxW = NULL; - tBlockDataReset(&pWriter->bDataW); } - code = tsdbSnapWriteTableData(pWriter, id); + ASSERT(pWriter->dWriter.pWriter == NULL); + + // open new + pWriter->fid = fid; + SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); + + // open reader + if (pSet) { + code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet); + if (code) goto _err; + + code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx); + if (code) goto _err; + } else { + // TODO + } + + // open writer + SHeadFile fHead = {.commitID = pWriter->commitID}; + SDataFile fData = {.commitID = pWriter->commitID}; + SSmaFile fSma = {.commitID = pWriter->commitID}; + SSttFile fStt = {.commitID = pWriter->commitID}; + SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; + if (pSet) { + wSet.diskId = pSet->diskId; + fData = *pSet->pDataF; + fSma = *pSet->pSmaF; + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + wSet.aSttF[iStt] = pSet->aSttF[iStt]; + } + wSet.nSttF = pSet->nSttF + 1; // TODO: fix pSet->nSttF == pTsdb->maxFile + } else { + SDiskID did = {0}; + tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); + tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); + wSet.diskId = did; + wSet.nSttF = 1; + } + wSet.aSttF[wSet.nSttF - 1] = &fStt; + code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet); + if (code) goto _err; + taosArrayClear(pWriter->dWriter.aBlockIdx); + tMapDataReset(&pWriter->dWriter.mDataBlk); + taosArrayClear(pWriter->dWriter.aSttBlk); + + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { + int32_t code = 0; + + pWriter->dReader.iBlockIdx++; + if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) { + pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx); + + code = tsdbReadBlock(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk); + if (code) goto _exit; + + pWriter->dReader.iDataBlk = -1; + tBlockDataReset(&pWriter->dReader.bData); + pWriter->dReader.iRow = 0; + } else { + pWriter->dReader.pBlockIdx = NULL; + } + +_exit: + return code; +} + +static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) { + int32_t code = 0; + + while (true) { + if (pWriter->dReader.pBlockIdx == NULL) break; + if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break; + + SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx; + code = tsdbWriteBlock(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx); + if (code) goto _exit; + + if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tsdbSnapNextTableData(pWriter); + if (code) goto _exit; + } + +_exit: + return code; +} + +static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { + int32_t code = 0; + + SBlockData* pBlockData = &pWriter->bData; + TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; + TSDBROW row = tsdbRowFromBlockData(pBlockData, iRow); + TSDBKEY key = TSDBROW_KEY(&row); + + // End last table data write if need + if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) { + // TODO + pWriter->id.suid = 0; + pWriter->id.uid = 0; + } + + // Start new table data write if need + if (pWriter->id.suid == 0 && pWriter->id.uid == 0) { + // Copy table data ahead + code = tsdbSnapWriteCopyData(pWriter, &id); + if (code) goto _err; + + // Start new table data + pWriter->id.suid = id.suid; + pWriter->id.uid = id.uid; + tMapDataReset(&pWriter->dWriter.mDataBlk); + } + + // Merge with .data file data + if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { + _merge_block: + // merge with data block in row + for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { + TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); + TSDBKEY tKey = TSDBROW_KEY(&trow); + + int32_t c = tsdbKeyCmprFn(&key, &tKey); + if (c < 0) { + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); + if (code) goto _err; + } else if (c > 0) { + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid); + if (code) goto _err; + } else { + ASSERT(0); + } + + if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { + // TODO: commit to the block + } + + if (c < 0) goto _exit; + } + + // merge with dataBlk in whole + SDataBlk tDataBlk = {.minKey = key, .maxKey = key}; + for (pWriter->dReader.iBlockIdx++; pWriter->dReader.iBlockIdx < pWriter->dReader.mDataBlk.nItem; + pWriter->dReader.iBlockIdx++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iBlockIdx, &dataBlk, tGetDataBlk); + + int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk); + + if (c < 0) { + code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); + if (code) goto _err; + } else if (c < 0) { + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); + if (code) goto _err; + + if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { + // TODO: write data block + } + } else { + code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData); + if (code) goto _err; + + goto _merge_block; + } + } + + code = tsdbSnapNextTableData(pWriter); + if (code) goto _err; + } + + // Append to the .stt data block (todo: check if need to set/reload sst block) + code = tBlockDataAppendRow(&pWriter->dWriter.sData, &row, NULL, id.uid); if (code) goto _err; - tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", - TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow); + if (pWriter->dWriter.sData.nRow >= pWriter->maxRow) { + // TODO: write sst block + } + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + SBlockData* pBlockData = &pWriter->bData; + + // Decode data + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf); + if (code) goto _err; + + // Loop to handle each row + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + // open file if need + TSKEY ts = pBlockData->aTSKEY[iRow]; + int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision); + if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) { + code = tsdbSnapWriteOpenFile(pWriter, fid); + if (code) goto _err; + } + + code = tsdbSnapWriteRowData(pWriter, iRow); + if (code) goto _err; + } + + // tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", + // TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow); return code; _err: @@ -1175,6 +1321,7 @@ _err: return code; } +// SNAP_DATA_DEL static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) { int32_t code = 0; @@ -1274,7 +1421,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 goto _err; } -_exit: return code; _err: @@ -1287,12 +1433,15 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; - if (pWriter->pDelFWriter == NULL) goto _exit; + if (pWriter->pDelFWriter == NULL) return code; TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; code = tsdbSnapMoveWriteDelData(pWriter, &id); if (code) goto _err; + code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW); + if (code) goto _err; + code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); if (code) goto _err; @@ -1307,7 +1456,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { if (code) goto _err; } -_exit: tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path); return code; @@ -1344,8 +1492,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr // for data file code = tBlockDataCreate(&pWriter->bData); - if (code) goto _err; + +#if 0 pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pWriter->aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1373,8 +1522,9 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } +#endif - // for del file + // SNAP_DATA_DEL pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); if (pWriter->aDelIdxR == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1395,6 +1545,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); return code; + _err: tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tstrerror(code)); @@ -1411,7 +1562,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { // code = tsdbFSRollback(pWriter->pTsdb->pFS); // if (code) goto _err; } else { - code = tsdbSnapWriteDataEnd(pWriter); + // code = tsdbSnapWriteDataEnd(pWriter); if (code) goto _err; code = tsdbSnapWriteDelEnd(pWriter); @@ -1452,8 +1603,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) goto _exit; } else { - if (pWriter->pDataFWriter) { - code = tsdbSnapWriteDataEnd(pWriter); + if (pWriter->dWriter.pWriter) { + // code = tsdbSnapWriteDataEnd(pWriter); if (code) goto _err; } } @@ -1466,7 +1617,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) _exit: tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); - return code; _err: