From 478d5ebb649b96ca035af1909f516e88f0495423 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 23 Aug 2022 13:49:10 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 43 +---- source/dnode/vnode/src/tsdb/tsdbFS.c | 39 +++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 151 ++++++++++-------- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 43 ++--- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 4 +- 7 files changed, 139 insertions(+), 145 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 230e4ddaad..1300c2ee8f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -262,7 +262,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); -int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL); +int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData); int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index b9f3897674..4963754eed 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -481,7 +481,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { taosArrayClear(state->aBlockL); } - code = tsdbReadBlockL(state->pDataFReader, state->aBlockL); + code = tsdbReadBlockL(state->pDataFReader, 0, state->aBlockL); if (code) goto _err; // SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index bd0a87803c..98ad8ea8fa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -35,6 +35,7 @@ typedef struct { int32_t minRow; int32_t maxRow; int8_t cmprAlg; + int8_t maxLast; SArray *aTbDataP; // memory STsdbFS fs; // disk // -------------- @@ -45,19 +46,11 @@ typedef struct { // commit file data struct { SDataFReader *pReader; - // data - SArray *aBlockIdx; // SArray - int32_t iBlockIdx; - SBlockIdx *pBlockIdx; - SMapData mBlock; // SMapData - SBlockData bData; - // last - SArray *aBlockL; // SArray - int32_t iBlockL; - SBlockData bDatal; - int32_t iRow; - SRowInfo *pRowInfo; - SRowInfo rowInfo; + SArray *aBlockIdx; // SArray + int32_t iBlockIdx; + SBlockIdx *pBlockIdx; + SMapData mBlock; // SMapData + SBlockData bData; } dReader; struct { SDataFWriter *pWriter; @@ -437,20 +430,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->dReader.pBlockIdx = NULL; } tBlockDataReset(&pCommitter->dReader.bData); - - // last - code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL); - if (code) goto _err; - - pCommitter->dReader.iBlockL = -1; - pCommitter->dReader.iRow = -1; - pCommitter->dReader.pRowInfo = &pCommitter->dReader.rowInfo; - tBlockDataReset(&pCommitter->dReader.bDatal); - code = tsdbCommitterNextLastRow(pCommitter); - if (code) goto _err; } else { pCommitter->dReader.pBlockIdx = NULL; - pCommitter->dReader.pRowInfo = NULL; } // Writer @@ -1273,6 +1254,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; + pCommitter->maxLast = TSDB_DEFAULT_LAST_FILE; // TODO: make it as a config pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); if (pCommitter->aTbDataP == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1301,15 +1283,6 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { code = tBlockDataCreate(&pCommitter->dReader.bData); if (code) goto _exit; - pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL)); - if (pCommitter->dReader.aBlockL == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - code = tBlockDataCreate(&pCommitter->dReader.bDatal); - if (code) goto _exit; - // Writer pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pCommitter->dWriter.aBlockIdx == NULL) { @@ -1338,8 +1311,6 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { taosArrayDestroy(pCommitter->dReader.aBlockIdx); tMapDataClear(&pCommitter->dReader.mBlock); tBlockDataDestroy(&pCommitter->dReader.bData, 1); - taosArrayDestroy(pCommitter->dReader.aBlockL); - tBlockDataDestroy(&pCommitter->dReader.bDatal, 1); // Writer taosArrayDestroy(pCommitter->dWriter.aBlockIdx); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 4fc2398206..000e262b92 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -544,7 +544,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); - SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nLastF = 1}; + SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid}; // head fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); @@ -576,13 +576,13 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { } // last - for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { - fSet.aLastF[iLast] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); - if (fSet.aLastF[iLast] == NULL) { + for (fSet.nLastF = 0; fSet.nLastF < pSet->nLastF; fSet.nLastF++) { + fSet.aLastF[fSet.nLastF] = (SLastFile *)taosMemoryMalloc(sizeof(SLastFile)); + if (fSet.aLastF[fSet.nLastF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - *fSet.aLastF[iLast] = *pSet->aLastF[iLast]; + *fSet.aLastF[fSet.nLastF] = *pSet->aLastF[fSet.nLastF]; } } @@ -981,12 +981,14 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1); ASSERT(nRef > 0); - nRef = atomic_fetch_add_32(&pSet->aLastF[0]->nRef, 1); - ASSERT(nRef > 0); - nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1); ASSERT(nRef > 0); + for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { + nRef = atomic_fetch_add_32(&pSet->aLastF[iLast]->nRef, 1); + ASSERT(nRef > 0); + } + if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -1032,15 +1034,6 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { taosMemoryFree(pSet->pDataF); } - // last - nRef = atomic_sub_fetch_32(&pSet->aLastF[0]->nRef, 1); - ASSERT(nRef >= 0); - if (nRef == 0) { - tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname); - taosRemoveFile(fname); - taosMemoryFree(pSet->aLastF[0]); - } - // sma nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1); ASSERT(nRef >= 0); @@ -1049,6 +1042,18 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { taosRemoveFile(fname); taosMemoryFree(pSet->pSmaF); } + + // last + for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { + nRef = atomic_sub_fetch_32(&pSet->aLastF[iLast]->nRef, 1); + ASSERT(nRef >= 0); + if (nRef == 0) { + tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->aLastF[iLast]); + /* code */ + } + } } taosArrayDestroy(pFS->aDFileSet); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a4738781f5..abe83ccb4b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,7 +16,7 @@ #include "osDef.h" #include "tsdb.h" -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ALL_ROWS_CHECKED_INDEX (INT16_MIN) #define DEFAULT_ROW_INDEX_VAL (-1) @@ -40,15 +40,15 @@ typedef struct { typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; - SMapData mapData; // block info (compressed) - SArray* pBlockList; // block data index list - SIterInfo iter; // mem buffer skip list iterator - SIterInfo iiter; // imem buffer skip list iterator - SArray* delSkyline; // delete info for this table - int32_t fileDelIndex; // file block delete index - int32_t lastBlockDelIndex;// delete index for last block - bool iterInit; // whether to initialize the in-memory skip list iterator or not - int16_t indexInBlockL;// row position in last block + SMapData mapData; // block info (compressed) + SArray* pBlockList; // block data index list + SIterInfo iter; // mem buffer skip list iterator + SIterInfo iiter; // imem buffer skip list iterator + SArray* delSkyline; // delete info for this table + int32_t fileDelIndex; // file block delete index + int32_t lastBlockDelIndex; // delete index for last block + bool iterInit; // whether to initialize the in-memory skip list iterator or not + int16_t indexInBlockL; // row position in last block } STableBlockScanInfo; typedef struct SBlockOrderWrapper { @@ -96,15 +96,15 @@ typedef struct SLastBlockReader { SVersionRange verRange; int32_t order; uint64_t uid; - int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL + int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL } SLastBlockReader; typedef struct SFilesetIter { - int32_t numOfFiles; // number of total files - int32_t index; // current accessed index in the list - SArray* pFileList; // data file list + int32_t numOfFiles; // number of total files + int32_t index; // current accessed index in the list + SArray* pFileList; // data file list int32_t order; - SLastBlockReader* pLastBlockReader; // last file block reader + SLastBlockReader* pLastBlockReader; // last file block reader } SFilesetIter; typedef struct SFileDataBlockInfo { @@ -116,9 +116,9 @@ typedef struct SFileDataBlockInfo { typedef struct SDataBlockIter { int32_t numOfBlocks; int32_t index; - SArray* blockList; // SArray + SArray* blockList; // SArray int32_t order; - SBlock block; // current SBlock data + SBlock block; // current SBlock data SHashObj* pTableMap; } SDataBlockIter; @@ -169,12 +169,13 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); -static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger); +static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, + SRowMerger* pMerger); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid); static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, - int32_t rowIndex); + int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); @@ -187,9 +188,9 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); -static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader); -static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); -static int32_t doBuildDataBlock(STsdbReader* pReader); +static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader); +static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); +static int32_t doBuildDataBlock(STsdbReader* pReader); static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -320,7 +321,8 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap } // init file iterator -static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader/*int32_t order, const char* idstr*/) { +static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, + STsdbReader* pReader /*int32_t order, const char* idstr*/) { size_t numOfFileset = taosArrayGetSize(aDFileSet); pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset; @@ -338,8 +340,8 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb SLastBlockReader* pLReader = pIter->pLastBlockReader; pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL)); - pLReader->order = pReader->order; - pLReader->window = pReader->window; + pLReader->order = pReader->order; + pLReader->window = pReader->window; pLReader->verRange = pReader->verRange; pLReader->currentBlockIndex = -1; @@ -658,7 +660,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { } static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex, - SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) { + SBlockNumber* pBlockNum, SArray* pQualifiedLastBlock) { int32_t numOfQTable = 0; size_t sizeInDisk = 0; size_t numOfTables = taosArrayGetSize(pIndexList); @@ -704,7 +706,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* } size_t numOfLast = taosArrayGetSize(pLastBlockIndex); - for(int32_t i = 0; i < numOfLast; ++i) { + for (int32_t i = 0; i < numOfLast; ++i) { SBlockL* pLastBlock = taosArrayGet(pLastBlockIndex, i); if (pLastBlock->suid != pReader->suid) { continue; @@ -729,9 +731,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks; double el = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s", - numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk - / 1000.0, el, pReader->idStr); + tsdbDebug( + "load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s", + numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk / 1000.0, el, + pReader->idStr); pReader->cost.numOfBlocks += total; pReader->cost.headFileLoadTime += el; @@ -879,7 +882,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); } else { @@ -977,10 +980,10 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v } static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); if (pFBlock != NULL) { STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock); } @@ -1097,8 +1100,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } int64_t et = taosGetTimestampUs(); - tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, (et - st) / 1000.0, - pReader->idStr); + tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, + (et - st) / 1000.0, pReader->idStr); cleanupBlockOrderSupporter(&sup); taosMemoryFree(pTree); @@ -1301,7 +1304,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc // todo here we need to each key in the last files to identify if it is really overlapped with last block bool overlapWithlastBlock = false; if (taosArrayGetSize(pLastBlockReader->pBlockL) > 0 && (pLastBlockReader->currentBlockIndex != -1)) { - SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); + SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex); overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey); } @@ -1395,7 +1398,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, - SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { + SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; SBlockData* pBlockData = &pReader->status.fileBlockData; @@ -1406,14 +1409,14 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf tsLast = getCurrentKeyInLastBlock(pLastBlockReader); } - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; int64_t minKey = 0; if (pReader->order == TSDB_ORDER_ASC) { - minKey = INT64_MAX; // chosen the minimum value + minKey = INT64_MAX; // chosen the minimum value if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) { minKey = tsLast; } @@ -1443,7 +1446,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf bool init = false; // ASC: file block ---> last block -----> imem -----> mem - //DESC: mem -----> imem -----> last block -----> file block + // DESC: mem -----> imem -----> last block -----> file block if (pReader->order == TSDB_ORDER_ASC) { if (minKey == key) { init = true; @@ -1583,7 +1586,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } -static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { +static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, + SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1595,7 +1599,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo ASSERT(pRow != NULL && piRow != NULL); SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - int64_t tsLast = INT64_MIN; + int64_t tsLast = INT64_MIN; if (hasDataInLastBlock(pLastBlockReader)) { tsLast = getCurrentKeyInLastBlock(pLastBlockReader); } @@ -1605,7 +1609,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - int64_t minKey = 0;//INT64_MAX; + int64_t minKey = 0; // INT64_MAX; if (ASCENDING_TRAVERSE(pReader->order)) { minKey = INT64_MAX; // let's find the minimum if (minKey > k.ts) { @@ -1624,7 +1628,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo minKey = tsLast; } } else { - minKey = INT64_MIN; // let find the maximum ts value + minKey = INT64_MIN; // let find the maximum ts value if (minKey < k.ts) { minKey = k.ts; } @@ -1734,7 +1738,8 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo return TSDB_CODE_SUCCESS; } -static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { +static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, + SBlockData* pBlockData) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1921,11 +1926,11 @@ static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid } } -static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) { +static void setAllRowsChecked(SLastBlockReader* pLastBlockReader) { *pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX; } -static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { +static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1; if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return false; @@ -1934,7 +1939,7 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader, STableBlockSc *(pLastBlockReader->rowIndex) += step; SBlockData* pBlockData = &pLastBlockReader->lastBlockData; - for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) { + for (int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) { if (pBlockData->aUid != NULL && pBlockData->aUid[i] != pLastBlockReader->uid) { continue; } @@ -1990,7 +1995,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; + int64_t key = (pBlockData->nRow > 0) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); @@ -2071,7 +2076,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } } else { // desc order SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); STSRow* pTSRow = NULL; SRowMerger merge = {0}; @@ -2114,7 +2119,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI static int32_t buildComposedDataBlock(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->pResBlock; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo != NULL) { @@ -2178,7 +2183,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { setComposedBlockFlag(pReader, true); int64_t et = taosGetTimestampUs(); - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 + " rows:%d, elapsed time:%.2f ms %s", pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr); @@ -2376,7 +2382,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return code; } - code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks); + code = tsdbReadBlockL(pReader->pFileReader, 0, pLastBlocks); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); return code; @@ -2411,12 +2417,13 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return TSDB_CODE_SUCCESS; } -static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) { +static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, + STsdbReader* pReader) { SArray* pBlocks = pLastBlockReader->pBlockL; SBlockL* pBlock = NULL; uint64_t uid = pBlockScanInfo->uid; - int32_t totalLastBlocks = (int32_t)taosArrayGetSize(pBlocks); + int32_t totalLastBlocks = (int32_t)taosArrayGetSize(pBlocks); initMemDataIterator(pBlockScanInfo, pReader); @@ -2443,7 +2450,8 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable } int64_t st = taosGetTimestampUs(); - int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema); + int32_t code = + tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr); return code; @@ -2471,10 +2479,10 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable } static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { - SReaderStatus* pStatus = &pReader->status; + SReaderStatus* pStatus = &pReader->status; SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; - while(1) { + while (1) { if (pStatus->pTableIter == NULL) { pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); if (pStatus->pTableIter == NULL) { @@ -2485,7 +2493,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { // load the last data block of current table // todo opt perf by avoiding load last block repeatly STableBlockScanInfo* pScanInfo = pStatus->pTableIter; - int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); + int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2560,7 +2568,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // note: the lastblock may be null here initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); - if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { + if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || + pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); } } @@ -2591,11 +2600,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) { // only return the rows in last block int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); - ASSERT (tsLast >= pBlock->maxKey.ts); + ASSERT(tsLast >= pBlock->maxKey.ts); tBlockDataReset(&pReader->status.fileBlockData); code = buildComposedDataBlock(pReader); - } else { // whole block is required, return it directly + } else { // whole block is required, return it directly SDataBlockInfo* pInfo = &pReader->pResBlock->info; pInfo->rows = pBlock->nRow; pInfo->uid = pScanInfo->uid; @@ -2670,7 +2679,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl // initialize the block iterator for a new fileset if (num.numOfBlocks > 0) { code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks); - } else { // no block data, only last block exists + } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); } @@ -2695,7 +2704,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { SDataBlockIter* pBlockIter = &pReader->status.blockIter; if (pBlockIter->numOfBlocks == 0) { - _begin: + _begin: code = doLoadLastBlockSequentially(pReader); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2742,7 +2751,8 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { bool hasNext = blockIteratorNext(&pReader->status.blockIter); if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); - } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now + } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > + 0) { // data blocks in current file are exhausted, let's try the next file now tBlockDataReset(&pReader->status.fileBlockData); resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); goto _begin; @@ -3101,8 +3111,9 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc } // todo check if the rows are dropped or not -int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) { - while(nextRowInLastBlock(pLastBlockReader, pScanInfo)) { +int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, + SRowMerger* pMerger) { + while (nextRowInLastBlock(pLastBlockReader, pScanInfo)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, *pLastBlockReader->rowIndex); @@ -3287,7 +3298,8 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* return TSDB_CODE_SUCCESS; } -int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) { +int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, + int32_t rowIndex) { int32_t i = 0, j = 0; int32_t outputRowIndex = pResBlock->info.rows; @@ -3385,7 +3397,6 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } - // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, const char* idstr) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 48fa80a788..0aa32702dd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -399,8 +399,8 @@ struct SDataFReader { SDFileSet *pSet; TdFilePtr pHeadFD; TdFilePtr pDataFD; - TdFilePtr pLastFD; TdFilePtr pSmaFD; + TdFilePtr aLastFD[TSDB_MAX_LAST_FILE]; uint8_t *aBuf[3]; }; @@ -445,11 +445,13 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS } // last - tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[0], fname); - pReader->pLastFD = taosOpenFile(fname, TD_FILE_READ); - if (pReader->pLastFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; + for (int32_t iLast = 0; iLast < pSet->nLastF; iLast++) { + tsdbLastFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aLastF[iLast], fname); + pReader->aLastFD[iLast] = taosOpenFile(fname, TD_FILE_READ); + if (pReader->aLastFD[iLast] == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } } *ppReader = pReader; @@ -465,30 +467,35 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { int32_t code = 0; if (*ppReader == NULL) goto _exit; + // head if (taosCloseFile(&(*ppReader)->pHeadFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + // data if (taosCloseFile(&(*ppReader)->pDataFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (taosCloseFile(&(*ppReader)->pLastFD) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - + // sma if (taosCloseFile(&(*ppReader)->pSmaFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } + // last + for (int32_t iLast = 0; iLast < (*ppReader)->pSet->nLastF; iLast++) { + if (taosCloseFile(&(*ppReader)->aLastFD[iLast]) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } + for (int32_t iBuf = 0; iBuf < sizeof((*ppReader)->aBuf) / sizeof(uint8_t *); iBuf++) { tFree((*ppReader)->aBuf[iBuf]); } - taosMemoryFree(*ppReader); _exit: @@ -563,10 +570,10 @@ _err: return code; } -int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) { +int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL) { int32_t code = 0; - int64_t offset = pReader->pSet->aLastF[0]->offset; - int64_t size = pReader->pSet->aLastF[0]->size - offset; + int64_t offset = pReader->pSet->aLastF[iLast]->offset; + int64_t size = pReader->pSet->aLastF[iLast]->size - offset; int64_t n; uint32_t delimiter; @@ -580,13 +587,13 @@ int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL) { if (code) goto _err; // seek - if (taosLSeekFile(pReader->pLastFD, offset, SEEK_SET) < 0) { + if (taosLSeekFile(pReader->aLastFD[iLast], offset, SEEK_SET) < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } // read - n = taosReadFile(pReader->pLastFD, pReader->aBuf[0], size); + n = taosReadFile(pReader->aLastFD[iLast], pReader->aBuf[0], size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -745,7 +752,7 @@ static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo tBlockDataClear(pBlockData); - TdFilePtr pFD = fromLast ? pReader->pLastFD : pReader->pDataFD; + TdFilePtr pFD = fromLast ? pReader->aLastFD[0] : pReader->pDataFD; // (todo) // uid + version + tskey code = tsdbReadAndCheck(pFD, pBlkInfo->offset, &pReader->aBuf[0], pBlkInfo->szKey, 1); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index dc2c2e7b05..fa7d370583 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); if (code) goto _err; - code = tsdbReadBlockL(pReader->pDataFReader, pReader->aBlockL); + code = tsdbReadBlockL(pReader->pDataFReader, 0, pReader->aBlockL); if (code) goto _err; // init @@ -911,7 +911,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); if (code) goto _err; - code = tsdbReadBlockL(pWriter->pDataFReader, pWriter->aBlockL); + code = tsdbReadBlockL(pWriter->pDataFReader, 0, pWriter->aBlockL); if (code) goto _err; } else { ASSERT(pWriter->pDataFReader == NULL);