From aec4297eb46182d4fb59e8a1cda4d1db0000559c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 2 Sep 2022 11:16:23 +0800 Subject: [PATCH] refact code --- source/dnode/vnode/src/inc/tsdb.h | 18 ++-- source/dnode/vnode/src/tsdb/tsdbCache.c | 6 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 54 +++++----- source/dnode/vnode/src/tsdb/tsdbRead.c | 72 +++++++------- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 32 +++--- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 44 ++++----- source/dnode/vnode/src/tsdb/tsdbUtil.c | 98 +++++++++---------- 7 files changed, 162 insertions(+), 162 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5a9d6c43af..d6bc3385ed 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -42,7 +42,7 @@ typedef struct SMemTable SMemTable; typedef struct STbDataIter STbDataIter; typedef struct SMapData SMapData; typedef struct SBlockIdx SBlockIdx; -typedef struct SBlock SBlock; +typedef struct SDataBlk SDataBlk; typedef struct SSstBlk SSstBlk; typedef struct SColData SColData; typedef struct SDiskDataHdr SDiskDataHdr; @@ -114,12 +114,12 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2); int32_t tPutBlockCol(uint8_t *p, void *ph); int32_t tGetBlockCol(uint8_t *p, void *ph); int32_t tBlockColCmprFn(const void *p1, const void *p2); -// SBlock -void tBlockReset(SBlock *pBlock); -int32_t tPutBlock(uint8_t *p, void *ph); -int32_t tGetBlock(uint8_t *p, void *ph); +// SDataBlk +void tBlockReset(SDataBlk *pBlock); +int32_t tPutDataBlk(uint8_t *p, void *ph); +int32_t tGetDataBlk(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); -bool tBlockHasSma(SBlock *pBlock); +bool tBlockHasSma(SDataBlk *pBlock); // SSstBlk int32_t tPutSstBlk(uint8_t *p, void *ph); int32_t tGetSstBlk(uint8_t *p, void *ph); @@ -265,8 +265,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk); -int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData); +int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); +int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); @@ -427,7 +427,7 @@ struct SSmaInfo { int32_t size; }; -struct SBlock { +struct SDataBlk { TSDBKEY minKey; TSDBKEY maxKey; int64_t minVer; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a0d52b8b3f..b12fb15798 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -527,7 +527,7 @@ typedef struct SFSNextRowIter { SMapData blockMap; int32_t nBlock; int32_t iBlock; - SBlock block; + SDataBlk block; SBlockData blockData; SBlockData *pBlockData; int32_t nRow; @@ -602,13 +602,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } case SFSNEXTROW_BLOCKDATA: if (state->iBlock >= 0) { - SBlock block = {0}; + SDataBlk block = {0}; tBlockReset(&block); // tBlockDataReset(&state->blockData); tBlockDataReset(state->pBlockData); - tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock); + tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); /* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */ tBlockDataReset(state->pBlockData); code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index caab533f6e..388e9ce4f1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -64,7 +64,7 @@ typedef struct { SArray *aBlockIdx; // SArray int32_t iBlockIdx; SBlockIdx *pBlockIdx; - SMapData mBlock; // SMapData + SMapData mBlock; // SMapData SBlockData bData; } dReader; struct { @@ -78,7 +78,7 @@ typedef struct { SDataFWriter *pWriter; SArray *aBlockIdx; // SArray SArray *aSstBlk; // SArray - SMapData mBlock; // SMapData + SMapData mBlock; // SMapData SBlockData bData; SBlockData bDatal; } dWriter; @@ -562,7 +562,7 @@ _err: static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { int32_t code = 0; SBlockData *pBlockData = &pCommitter->dWriter.bData; - SBlock block; + SDataBlk block; ASSERT(pBlockData->nRow > 0); @@ -597,8 +597,8 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { ((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0); if (code) goto _err; - // put SBlock - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock); + // put SDataBlk + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk); if (code) goto _err; // clear @@ -1098,7 +1098,7 @@ _exit: return code; } -static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) { +static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { int32_t code = 0; SBlockData *pBlockData = &pCommitter->dWriter.bData; SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); @@ -1122,7 +1122,7 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) { pRowInfo = NULL; } else { TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pBlock->minKey) >= 0) pRowInfo = NULL; + if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL; } } @@ -1144,14 +1144,14 @@ _err: return code; } -static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) { +static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) { int32_t code = 0; SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; SBlockData *pBDataR = &pCommitter->dReader.bData; SBlockData *pBDataW = &pCommitter->dWriter.bData; - code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBDataR); + code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR); if (code) goto _err; tBlockDataClear(pBDataW); @@ -1188,7 +1188,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) { pRowInfo = NULL; } else { TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); - if (tsdbKeyCmprFn(&tKey, &pBlock->maxKey) > 0) pRowInfo = NULL; + if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL; } } } else { @@ -1237,57 +1237,57 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) { ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0); if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) { int32_t iBlock = 0; - SBlock block; - SBlock *pBlock = █ + SDataBlk block; + SDataBlk *pDataBlk = █ SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid); - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); - while (pBlock && pRowInfo) { - SBlock tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}; - int32_t c = tBlockCmprFn(pBlock, &tBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); + while (pDataBlk && pRowInfo) { + SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)}; + int32_t c = tBlockCmprFn(pDataBlk, &tBlock); if (c < 0) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); if (code) goto _err; iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); } else { - pBlock = NULL; + pDataBlk = NULL; } } else if (c > 0) { - code = tsdbCommitAheadBlock(pCommitter, pBlock); + code = tsdbCommitAheadBlock(pCommitter, pDataBlk); if (code) goto _err; pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; } else { - code = tsdbCommitMergeBlock(pCommitter, pBlock); + code = tsdbCommitMergeBlock(pCommitter, pDataBlk); if (code) goto _err; iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); } else { - pBlock = NULL; + pDataBlk = NULL; } pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL; } } - while (pBlock) { - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock); + while (pDataBlk) { + code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk); if (code) goto _err; iBlock++; if (iBlock < pCommitter->dReader.mBlock.nItem) { - tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk); } else { - pBlock = NULL; + pDataBlk = NULL; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ef0c23fd58..828bc8469a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -109,7 +109,7 @@ typedef struct SDataBlockIter { int32_t index; SArray* blockList; // SArray int32_t order; - SBlock block; // current SBlock data + SDataBlk block; // current SDataBlk data SHashObj* pTableMap; } SDataBlockIter; @@ -590,8 +590,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN sizeInDisk += pScanInfo->mapData.nData; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { - SBlock block = {0}; - tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock); + SDataBlk block = {0}; + tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk); // 1. time range check if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) { @@ -665,7 +665,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { return pBlockInfo; } -static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } +static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SReaderStatus* pStatus = &pReader->status; @@ -673,7 +673,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn SBlockData* pBlockData = &pStatus->fileBlockData; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - SBlock* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); @@ -758,8 +758,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; ASSERT(pBlockInfo != NULL); - SBlock* pBlock = getCurrentBlock(pBlockIter); - int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); + int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, code:%s %s", @@ -836,7 +836,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { if (pBlockInfo != NULL) { STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); - tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock); + tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk); } #if 0 @@ -887,12 +887,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; - SBlock block = {0}; + SDataBlk block = {0}; for (int32_t k = 0; k < num; ++k) { SBlockOrderWrapper wrapper = {0}; int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k); - tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock); + tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk); wrapper.uid = pTableScanInfo->uid; wrapper.offset = block.aSubBlock[0].offset; @@ -981,15 +981,15 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { /** * This is an two rectangles overlap cases. */ -static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) { +static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) { return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) || (pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) || (pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) || (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer); } -static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, - int32_t* nextIndex, int32_t order) { +static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, + int32_t* nextIndex, int32_t order) { bool asc = ASCENDING_TRAVERSE(order); if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { return NULL; @@ -1002,10 +1002,10 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab int32_t step = asc ? 1 : -1; *nextIndex = pFBlockInfo->tbBlockIdx + step; - SBlock* pBlock = taosMemoryCalloc(1, sizeof(SBlock)); - int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); + SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk)); + int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); - tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock); + tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk); return pBlock; } @@ -1048,7 +1048,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t return TSDB_CODE_SUCCESS; } -static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) { +static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) { // it is the last block in current file, no chance to overlap with neighbor blocks. if (ASCENDING_TRAVERSE(order)) { return pBlock->maxKey.ts == pNeighbor->minKey.ts; @@ -1057,19 +1057,19 @@ static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t } } -static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) { +static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) { bool ascScan = ASCENDING_TRAVERSE(order); return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) || (!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts)); } -static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) { +static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) { return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) && (pBlock->minVer <= pVerRange->maxVer); } -static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) { +static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) { size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline); for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) { @@ -1103,7 +1103,7 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons return false; } -static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) { +static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) { if (pBlockScanInfo->delSkyline == NULL) { return false; } @@ -1138,10 +1138,10 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl // 3. current timestamp should not be overlap with each other // 4. output buffer should be large enough to hold all rows in current block // 5. delete info should not overlap with current block data -static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, +static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock, STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) { - int32_t neighborIndex = 0; - SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); + int32_t neighborIndex = 0; + SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); // overlap with neighbor bool overlapWithNeighbor = false; @@ -1948,7 +1948,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pDumpInfo->rowIndex += step; - SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; @@ -1967,7 +1967,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { - SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order); break; } @@ -2320,8 +2320,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } static int32_t doBuildDataBlock(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; - SBlock* pBlock = NULL; + int32_t code = TSDB_CODE_SUCCESS; + SDataBlk* pBlock = NULL; SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; @@ -2418,7 +2418,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { // set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - SBlock* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); SReaderStatus* pStatus = &pReader->status; @@ -2789,7 +2789,7 @@ typedef enum { CHECK_FILEBLOCK_QUIT = 0x2, } CHECK_FILEBLOCK_STATE; -static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock, +static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock, SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key, CHECK_FILEBLOCK_STATE* state) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -2798,8 +2798,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - int32_t nextIndex = -1; - SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order); + int32_t nextIndex = -1; + SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order); if (pNeighborBlock == NULL) { // do nothing return 0; } @@ -2863,7 +2863,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc CHECK_FILEBLOCK_STATE st; SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); + SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st); if (st == CHECK_FILEBLOCK_QUIT) { break; @@ -3447,8 +3447,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); - SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); - int64_t stime = taosGetTimestampUs(); + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); + int64_t stime = taosGetTimestampUs(); SBlockLoadSuppInfo* pSup = &pReader->suppInfo; @@ -3629,7 +3629,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa while (true) { if (hasNext) { - SBlock* pBlock = getCurrentBlock(pBlockIter); + SDataBlk* pBlock = getCurrentBlock(pBlockIter); int32_t numOfRows = pBlock->nRow; pTableBlockInfo->totalRows += numOfRows; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index d9c85f55e0..dbaf9b234c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -677,9 +677,9 @@ _err: return code; } -int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) { +int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) { int32_t code = 0; - SSmaInfo *pSmaInfo = &pBlock->smaInfo; + SSmaInfo *pSmaInfo = &pDataBlk->smaInfo; ASSERT(pSmaInfo->size > 0); @@ -843,13 +843,13 @@ _err: return code; } -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData) { +int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t code = 0; - code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[0], 0, pBlockData); + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], 0, pBlockData); if (code) goto _err; - if (pBlock->nSubBlock > 1) { + if (pDataBlk->nSubBlock > 1) { SBlockData bData1; SBlockData bData2; @@ -863,8 +863,8 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl tBlockDataInitEx(&bData1, pBlockData); tBlockDataInitEx(&bData2, pBlockData); - for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[iSubBlock], 0, &bData1); + for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], 0, &bData1); if (code) { tBlockDataDestroy(&bData1, 1); tBlockDataDestroy(&bData2, 1); @@ -1355,28 +1355,28 @@ _err: return code; } -static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { +static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SDataBlk *pDataBlk) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; if (iRow == 0) { - if (tsdbKeyCmprFn(&pBlock->minKey, &key) > 0) { - pBlock->minKey = key; + if (tsdbKeyCmprFn(&pDataBlk->minKey, &key) > 0) { + pDataBlk->minKey = key; } } else { if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { - pBlock->hasDup = 1; + pDataBlk->hasDup = 1; } } - if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pBlock->maxKey, &key) < 0) { - pBlock->maxKey = key; + if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pDataBlk->maxKey, &key) < 0) { + pDataBlk->maxKey = key; } - pBlock->minVer = TMIN(pBlock->minVer, key.version); - pBlock->maxVer = TMAX(pBlock->maxVer, key.version); + pDataBlk->minVer = TMIN(pDataBlk->minVer, key.version); + pDataBlk->maxVer = TMAX(pDataBlk->maxVer, key.version); } - pBlock->nRow += pBlockData->nRow; + pDataBlk->nRow += pBlockData->nRow; } static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 62c279ef06..02f2205875 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -33,7 +33,7 @@ struct STsdbSnapReader { int32_t iBlockIdx; int32_t iBlockL; - SMapData mBlock; // SMapData + SMapData mBlock; // SMapData int32_t iBlock; SBlockData oBlockData; SBlockData nBlockData; @@ -115,8 +115,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { // } } else if (pReader->pBlockIdx) { while (pReader->iBlock < pReader->mBlock.nItem) { - SBlock block; - tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetBlock); + SDataBlk block; + tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk); if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) { // load data (todo) @@ -426,7 +426,7 @@ struct STsdbSnapWriter { SArray* aBlockIdx; // SArray int32_t iBlockIdx; SBlockIdx* pBlockIdx; - SMapData mBlock; // SMapData + SMapData mBlock; // SMapData int32_t iBlock; SBlockData* pBlockData; int32_t iRow; @@ -437,11 +437,11 @@ struct STsdbSnapWriter { SDataFWriter* pDataFWriter; SBlockIdx* pBlockIdxW; // NULL when no committing table - SBlock blockW; + SDataBlk blockW; SBlockData bDataW; SBlockIdx blockIdxW; - SMapData mBlockW; // SMapData + SMapData mBlockW; // SMapData SArray* aBlockIdxW; // SArray SArray* aBlockLW; // SArray @@ -475,7 +475,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; tBlockReset(&pWriter->blockW); @@ -499,15 +499,15 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); // if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; } while (true) { if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - SBlock block; - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + SDataBlk block; + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); // if (block.last) { // code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); @@ -520,13 +520,13 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { // if (code) goto _err; // } - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); if (code) goto _err; pWriter->iBlock++; } - // SBlock + // SDataBlk // code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); // if (code) goto _err; @@ -553,10 +553,10 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p if (code) goto _err; // SBlockData - SBlock block; + SDataBlk block; tMapDataReset(&pWriter->mBlockW); for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { - tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock); + tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk); // if (block.last) { // code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); @@ -570,11 +570,11 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p // if (code) goto _err; // } - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); if (code) goto _err; } - // SBlock + // SDataBlk SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx); if (code) goto _err; @@ -642,10 +642,10 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { while (true) { if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - SBlock block; - int32_t c; + SDataBlk block; + int32_t c; - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock); + tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); // if (block.last) { // pWriter->pBlockData = &pWriter->bDataR; @@ -668,14 +668,14 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); // if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; tBlockReset(&pWriter->blockW); tBlockDataClear(&pWriter->bDataW); } - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); if (code) goto _err; pWriter->iBlock++; @@ -719,7 +719,7 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { // &pWriter->blockW, pWriter->cmprAlg); // if (code) goto _err; - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock); + code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); if (code) goto _err; tBlockReset(&pWriter->blockW); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 2a3ffd6089..62dfb5157f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -231,69 +231,69 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs) { return 0; } -// SBlock ====================================================== -void tBlockReset(SBlock *pBlock) { - *pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; +// SDataBlk ====================================================== +void tBlockReset(SDataBlk *pDataBlk) { + *pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; } -int32_t tPutBlock(uint8_t *p, void *ph) { - int32_t n = 0; - SBlock *pBlock = (SBlock *)ph; +int32_t tPutDataBlk(uint8_t *p, void *ph) { + int32_t n = 0; + SDataBlk *pDataBlk = (SDataBlk *)ph; - n += tPutI64v(p ? p + n : p, pBlock->minKey.version); - n += tPutI64v(p ? p + n : p, pBlock->minKey.ts); - n += tPutI64v(p ? p + n : p, pBlock->maxKey.version); - n += tPutI64v(p ? p + n : p, pBlock->maxKey.ts); - n += tPutI64v(p ? p + n : p, pBlock->minVer); - n += tPutI64v(p ? p + n : p, pBlock->maxVer); - n += tPutI32v(p ? p + n : p, pBlock->nRow); - n += tPutI8(p ? p + n : p, pBlock->hasDup); - n += tPutI8(p ? p + n : p, pBlock->nSubBlock); - for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock); - n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szKey); + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->minVer); + n += tPutI64v(p ? p + n : p, pDataBlk->maxVer); + n += tPutI32v(p ? p + n : p, pDataBlk->nRow); + n += tPutI8(p ? p + n : p, pDataBlk->hasDup); + n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock); + for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey); } - if (pBlock->nSubBlock == 1 && !pBlock->hasDup) { - n += tPutI64v(p ? p + n : p, pBlock->smaInfo.offset); - n += tPutI32v(p ? p + n : p, pBlock->smaInfo.size); + if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) { + n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset); + n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size); } return n; } -int32_t tGetBlock(uint8_t *p, void *ph) { - int32_t n = 0; - SBlock *pBlock = (SBlock *)ph; +int32_t tGetDataBlk(uint8_t *p, void *ph) { + int32_t n = 0; + SDataBlk *pDataBlk = (SDataBlk *)ph; - n += tGetI64v(p + n, &pBlock->minKey.version); - n += tGetI64v(p + n, &pBlock->minKey.ts); - n += tGetI64v(p + n, &pBlock->maxKey.version); - n += tGetI64v(p + n, &pBlock->maxKey.ts); - n += tGetI64v(p + n, &pBlock->minVer); - n += tGetI64v(p + n, &pBlock->maxVer); - n += tGetI32v(p + n, &pBlock->nRow); - n += tGetI8(p + n, &pBlock->hasDup); - n += tGetI8(p + n, &pBlock->nSubBlock); - for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { - n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock); - n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szKey); + n += tGetI64v(p + n, &pDataBlk->minKey.version); + n += tGetI64v(p + n, &pDataBlk->minKey.ts); + n += tGetI64v(p + n, &pDataBlk->maxKey.version); + n += tGetI64v(p + n, &pDataBlk->maxKey.ts); + n += tGetI64v(p + n, &pDataBlk->minVer); + n += tGetI64v(p + n, &pDataBlk->maxVer); + n += tGetI32v(p + n, &pDataBlk->nRow); + n += tGetI8(p + n, &pDataBlk->hasDup); + n += tGetI8(p + n, &pDataBlk->nSubBlock); + for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + n += tGetI64v(p + n, &pDataBlk->aSubBlock[iSubBlock].offset); + n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szBlock); + n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szKey); } - if (pBlock->nSubBlock == 1 && !pBlock->hasDup) { - n += tGetI64v(p + n, &pBlock->smaInfo.offset); - n += tGetI32v(p + n, &pBlock->smaInfo.size); + if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) { + n += tGetI64v(p + n, &pDataBlk->smaInfo.offset); + n += tGetI32v(p + n, &pDataBlk->smaInfo.size); } else { - pBlock->smaInfo.offset = 0; - pBlock->smaInfo.size = 0; + pDataBlk->smaInfo.offset = 0; + pDataBlk->smaInfo.size = 0; } return n; } int32_t tBlockCmprFn(const void *p1, const void *p2) { - SBlock *pBlock1 = (SBlock *)p1; - SBlock *pBlock2 = (SBlock *)p2; + SDataBlk *pBlock1 = (SDataBlk *)p1; + SDataBlk *pBlock2 = (SDataBlk *)p2; if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) { return -1; @@ -304,11 +304,11 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) { return 0; } -bool tBlockHasSma(SBlock *pBlock) { - if (pBlock->nSubBlock > 1) return false; - if (pBlock->hasDup) return false; +bool tBlockHasSma(SDataBlk *pDataBlk) { + if (pDataBlk->nSubBlock > 1) return false; + if (pDataBlk->hasDup) return false; - return pBlock->smaInfo.size > 0; + return pDataBlk->smaInfo.size > 0; } // SSstBlk ======================================================