From 5e2a498660dbf5060b5e03f651f85e1a90c7bf48 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 30 Jul 2022 12:08:04 +0000 Subject: [PATCH] refact: prepare last refact --- source/dnode/vnode/src/inc/tsdb.h | 24 ++++ source/dnode/vnode/src/tsdb/tsdbCommit.c | 133 ++++++++++++----------- 2 files changed, 93 insertions(+), 64 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0bd5dbb602..f1e980c026 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -43,6 +43,7 @@ typedef struct STbDataIter STbDataIter; typedef struct SMapData SMapData; typedef struct SBlockIdx SBlockIdx; typedef struct SBlock SBlock; +typedef struct SBlockL SBlockL; typedef struct SColData SColData; typedef struct SBlockDataHdr SBlockDataHdr; typedef struct SBlockData SBlockData; @@ -414,6 +415,29 @@ struct SBlock { SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS]; }; +struct SBlockL { + struct { + int64_t uid; + int64_t version; + TSKEY ts; + } minKey; + struct { + int64_t uid; + int64_t version; + TSKEY ts; + } maxKey; + int64_t minVer; + int64_t maxVer; + int32_t nRow; + int8_t cmprAlg; + int64_t offset; + int32_t szBlock; + int32_t szBlockCol; + int32_t szUid; + int32_t szVer; + int32_t szTSKEY; +}; + struct SColData { int16_t cid; int8_t type; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 24b300cb1e..e6db812865 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -36,16 +36,20 @@ typedef struct { TSKEY minKey; TSKEY maxKey; // commit file data - SDataFReader *pReader; - SArray *aBlockIdx; // SArray - SMapData oBlockMap; // SMapData, read from reader - SBlockData oBlockData; - SDataFWriter *pWriter; - SArray *aBlockIdxN; // SArray - SMapData nBlockMap; // SMapData - SBlockData nBlockData; - SSkmInfo skmTable; - SSkmInfo skmRow; + struct { + SDataFReader *pReader; + SArray *aBlockIdx; // SArray + SMapData mBlock; // SMapData, read from reader + SBlockData bData; + } dReader; + struct { + SDataFWriter *pWriter; + SArray *aBlockIdx; // SArray + SMapData mBlock; // SMapData + SBlockData bData; + } dWriter; + SSkmInfo skmTable; + SSkmInfo skmRow; /* commit del */ SDelFReader *pDelFReader; SDelFWriter *pDelFWriter; @@ -276,16 +280,16 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->nextKey = TSKEY_MAX; // old - taosArrayClear(pCommitter->aBlockIdx); - tMapDataReset(&pCommitter->oBlockMap); - tBlockDataReset(&pCommitter->oBlockData); + taosArrayClear(pCommitter->dReader.aBlockIdx); + tMapDataReset(&pCommitter->dReader.mBlock); + tBlockDataReset(&pCommitter->dReader.bData); pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid}, tDFileSetCmprFn, TD_EQ); if (pRSet) { - code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); + code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet); if (code) goto _err; - code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL); + code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL); if (code) goto _err; } @@ -296,9 +300,9 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { SSmaFile fSma; SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma}; - taosArrayClear(pCommitter->aBlockIdxN); - tMapDataReset(&pCommitter->nBlockMap); - tBlockDataReset(&pCommitter->nBlockData); + taosArrayClear(pCommitter->dWriter.aBlockIdx); + tMapDataReset(&pCommitter->dWriter.mBlock); + tBlockDataReset(&pCommitter->dWriter.bData); if (pRSet) { wSet.diskId = pRSet->diskId; wSet.fid = pCommitter->commitFid; @@ -320,7 +324,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0}; fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0}; } - code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet); + code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); if (code) goto _err; _exit: @@ -391,10 +395,11 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat } } - code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); + code = + tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg); if (code) goto _err; - code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); if (code) goto _err; return code; @@ -407,8 +412,8 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB int8_t toDataOnly) { int32_t code = 0; SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; - SBlockData *pBlockDataMerge = &pCommitter->oBlockData; - SBlockData *pBlockData = &pCommitter->nBlockData; + SBlockData *pBlockDataMerge = &pCommitter->dReader.bData; + SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlock block; SBlock *pBlock = █ TSDBROW *pRow1; @@ -416,7 +421,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB TSDBROW *pRow2 = &row2; // read SBlockData - code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL); + code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL); if (code) goto _err; code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema); @@ -513,7 +518,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter TSDBROW *pRow; SBlock block; SBlock *pBlock = █ - SBlockData *pBlockData = &pCommitter->nBlockData; + SBlockData *pBlockData = &pCommitter->dWriter.bData; int64_t suid = pIter->pTbData->suid; int64_t uid = pIter->pTbData->uid; @@ -575,14 +580,14 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S SBlock block; if (pBlock->last) { - code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL); + code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlock, &pCommitter->dReader.bData, NULL, NULL); if (code) goto _err; tBlockReset(&block); - code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0); + code = tsdbCommitBlockData(pCommitter, &pCommitter->dReader.bData, &block, pBlockIdx, 0); if (code) goto _err; } else { - code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); if (code) goto _err; } @@ -598,10 +603,10 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int6 SBlockIdx blockIdx = {.suid = suid, .uid = uid}; SBlockIdx *pBlockIdx = &blockIdx; - code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx); + code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx); if (code) goto _err; - if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) { + if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -643,7 +648,7 @@ static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) { static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { int32_t code = 0; - SBlockData *pBlockData = &pCommitter->nBlockData; + SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid}; SBlock block; TSDBROW *pRow; @@ -711,10 +716,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl } if (pBlockIdx) { - code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL); + code = tsdbReadBlock(pCommitter->dReader.pReader, pBlockIdx, &pCommitter->dReader.mBlock, NULL); if (code) goto _err; - nBlock = pCommitter->oBlockMap.nItem; + nBlock = pCommitter->dReader.mBlock.nItem; ASSERT(nBlock > 0); suid = pBlockIdx->suid; @@ -726,13 +731,13 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (pRow == NULL && nBlock == 0) goto _exit; // start =========== - tMapDataReset(&pCommitter->nBlockMap); + tMapDataReset(&pCommitter->dWriter.mBlock); SBlock block; SBlock *pBlock = █ iBlock = 0; if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } @@ -756,7 +761,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; iBlock++; if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } @@ -771,7 +776,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl iBlock++; if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } @@ -798,7 +803,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl SBlock nextBlock = {0}; tBlockReset(&nextBlock); - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock + 1, &nextBlock, tGetBlock); toKey = nextBlock.minKey; } @@ -810,7 +815,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL; iBlock++; if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } @@ -822,7 +827,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl iBlock++; if (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); } else { pBlock = NULL; } @@ -857,23 +862,23 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; // write blockIdx - code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL); + code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL); if (code) goto _err; // update file header - code = tsdbUpdateDFileSetHeader(pCommitter->pWriter); + code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter); if (code) goto _err; // upsert SDFileSet - code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->pWriter->wSet); + code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet); if (code) goto _err; // close and sync - code = tsdbDataFWriterClose(&pCommitter->pWriter, 1); + code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1); if (code) goto _err; - if (pCommitter->pReader) { - code = tsdbDataFReaderClose(&pCommitter->pReader); + if (pCommitter->dReader.pReader) { + code = tsdbDataFReaderClose(&pCommitter->dReader.pReader); if (code) goto _err; } @@ -898,14 +903,14 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { int32_t iTbData = 0; int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); int32_t iBlockIdx = 0; - int32_t nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx); + int32_t nBlockIdx = taosArrayGetSize(pCommitter->dReader.aBlockIdx); STbData *pTbData; SBlockIdx *pBlockIdx; ASSERT(nTbData > 0); pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); - pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL; + pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL; while (pTbData || pBlockIdx) { if (pTbData && pBlockIdx) { int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx); @@ -936,7 +941,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { if (code) goto _err; iBlockIdx++; - pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL; + pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL; continue; _commit_table_mem_and_disk: @@ -944,7 +949,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { if (code) goto _err; iBlockIdx++; - pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL; + pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL; iTbData++; pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL; continue; @@ -958,8 +963,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) { _err: tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - tsdbDataFReaderClose(&pCommitter->pReader); - tsdbDataFWriterClose(&pCommitter->pWriter, 0); + tsdbDataFReaderClose(&pCommitter->dReader.pReader); + tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0); return code; } @@ -996,22 +1001,22 @@ _err: static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { int32_t code = 0; - pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCommitter->aBlockIdx == NULL) { + pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pCommitter->dReader.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCommitter->aBlockIdxN == NULL) { + pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pCommitter->dWriter.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - code = tBlockDataInit(&pCommitter->oBlockData); + code = tBlockDataInit(&pCommitter->dReader.bData); if (code) goto _exit; - code = tBlockDataInit(&pCommitter->nBlockData); + code = tBlockDataInit(&pCommitter->dWriter.bData); if (code) goto _exit; _exit: @@ -1019,12 +1024,12 @@ _exit: } static void tsdbCommitDataEnd(SCommitter *pCommitter) { - taosArrayDestroy(pCommitter->aBlockIdx); - tMapDataClear(&pCommitter->oBlockMap); - tBlockDataClear(&pCommitter->oBlockData, 1); - taosArrayDestroy(pCommitter->aBlockIdxN); - tMapDataClear(&pCommitter->nBlockMap); - tBlockDataClear(&pCommitter->nBlockData, 1); + taosArrayDestroy(pCommitter->dReader.aBlockIdx); + tMapDataClear(&pCommitter->dReader.mBlock); + tBlockDataClear(&pCommitter->dReader.bData, 1); + taosArrayDestroy(pCommitter->dWriter.aBlockIdx); + tMapDataClear(&pCommitter->dWriter.mBlock); + tBlockDataClear(&pCommitter->dWriter.bData, 1); tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema); }