From 79be7eea8c1e8b3c02f1a51b71f734b94d94e674 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Apr 2024 10:06:07 +0800 Subject: [PATCH] fix(tsdb): fix invalid read, and do some internal refactor. --- source/common/src/tdatablock.c | 5 ++ source/dnode/vnode/src/tsdb/tsdbRead2.c | 85 +++++++++------------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 67 +++++++++++------ source/dnode/vnode/src/tsdb/tsdbReadUtil.h | 7 +- 4 files changed, 90 insertions(+), 74 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 69b2a2e6a3..8d9ef6831d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1331,6 +1331,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } + if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) { + taosMemoryFreeClear(pBlock->info.pks[0].pData); + taosMemoryFreeClear(pBlock->info.pks[1].pData); + } + blockDataFreeRes(pBlock); taosMemoryFreeClear(pBlock); return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index c08face243..be01afb960 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -67,8 +67,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey, STsdbReader* pReader); -static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, - STsdbReader* pReader); +static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost); static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, @@ -392,7 +391,7 @@ _err: return code; } -static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; @@ -403,8 +402,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) { } } -static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); } - static void initReaderStatus(SReaderStatus* pStatus) { pStatus->pTableIter = NULL; pStatus->loadFromFile = true; @@ -657,21 +654,19 @@ _end: return code; } -static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, - SArray* pTableScanInfoList) { - size_t sizeInDisk = 0; - int64_t st = taosGetTimestampUs(); +static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, + SArray* pTableScanInfoList) { + int32_t k = 0; + size_t sizeInDisk = 0; + int64_t st = taosGetTimestampUs(); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + STimeWindow w = pReader->info.window; + SBrinRecord* pRecord = NULL; + int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); + SBrinRecordIter iter = {0}; // clear info for the new file cleanupInfoForNextFileset(pReader->status.pTableMap); - - int32_t k = 0; - int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - STimeWindow w = pReader->info.window; - SBrinRecord* pRecord = NULL; - - SBrinRecordIter iter = {0}; initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); while (((pRecord = getNextBrinRecord(&iter)) != NULL)) { @@ -743,14 +738,27 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } if (pScanInfo->pBlockList == NULL) { - pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord)); + pScanInfo->pBlockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); + if (pScanInfo->pBlockList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } - void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord); + if (pScanInfo->pBlockIdxList == NULL) { + pScanInfo->pBlockIdxList = taosArrayInit(4, sizeof(STableDataBlockIdx)); + if (pScanInfo->pBlockIdxList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SFileDataBlockInfo blockInfo = {.tbBlockIdx = TARRAY_SIZE(pScanInfo->pBlockList)}; + recordToBlockInfo(&blockInfo, pRecord); + void* p1 = taosArrayPush(pScanInfo->pBlockList, &blockInfo); if (p1 == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + // todo: refactor to record the fileset skey/ekey if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; } @@ -1323,10 +1331,12 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p } static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, - STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order, + STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) { - bool asc = ASCENDING_TRAVERSE(order); - if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) { + bool asc = ASCENDING_TRAVERSE(order); + int32_t step = asc ? 1 : -1; + + if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pScanInfo->pBlockIdxList) - 1) { return false; } @@ -1334,9 +1344,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return false; } - int32_t step = asc ? 1 : -1; - STableDataBlockIdx* pTableDataBlockIdx = - taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); + STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); blockInfoToRecord(pRecord, p); @@ -1344,22 +1352,6 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo return true; } -static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { - int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; - int32_t index = pBlockIter->index; - - while (index < pBlockIter->numOfBlocks && index >= 0) { - SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); - if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { - return index; - } - - index += step; - } - - return -1; -} - static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) { if (index < 0 || index >= pBlockIter->numOfBlocks) { @@ -2706,7 +2698,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) { - code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList); + code = loadFileBlockBrinInfo(pReader, pIndexList, pBlockNum, pTableList); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); return code; @@ -3154,23 +3146,14 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; if (pBlockInfo) { - // todo handle -// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); -// if (pScanInfo) { -// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey); -// lastKey = pScanInfo->lastProcKey; -// } - pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; } else { pDumpInfo->totalRows = 0; pDumpInfo->rowIndex = 0; -// pDumpInfo->lastKey.key.ts = lastKey; } pDumpInfo->allDumped = false; -// pDumpInfo->lastKey = lastKey; } static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 2a7b0140df..7e81f1df36 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -167,6 +167,13 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in return TSDB_CODE_SUCCESS; } +void clearRowKey(SRowKey* pKey) { + if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) { + return; + } + taosMemoryFree(pKey->pks[0].pData); +} + static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) { int32_t numOfPks = pReader->suppInfo.numOfPks; bool asc = ASCENDING_TRAVERSE(pReader->info.order); @@ -293,6 +300,11 @@ void clearBlockScanInfo(STableBlockScanInfo* p) { p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList); p->pMemDelData = taosArrayDestroy(p->pMemDelData); p->pFileDelData = taosArrayDestroy(p->pFileDelData); + + clearRowKey(&p->lastProcKey); + clearRowKey(&p->sttRange.skey); + clearRowKey(&p->sttRange.ekey); + clearRowKey(&p->sttKeyInfo.nextProcKey); } void destroyAllBlockScanInfo(SSHashObj* pTableMap) { @@ -415,7 +427,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) { +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) { pBlockInfo->uid = record->uid; pBlockInfo->firstKey = record->firstKey.key.ts; pBlockInfo->lastKey = record->lastKey.key.ts; @@ -449,12 +461,36 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor } } +static void freeItem(void* pItem) { + SFileDataBlockInfo* p = pItem; + if (p->firstPKLen > 0) { + taosMemoryFreeClear(p->firstPk.pData); + } + + if (p->lastPKLen > 0) { + taosMemoryFreeClear(p->lastPk.pData); + } +} + +void clearDataBlockIterator(SDataBlockIter* pIter) { + pIter->index = -1; + pIter->numOfBlocks = 0; + taosArrayClearEx(pIter->blockList, freeItem); +} + +void cleanupDataBlockIterator(SDataBlockIter* pIter) { + pIter->index = -1; + pIter->numOfBlocks = 0; + taosArrayDestroyEx(pIter->blockList, freeItem); +} + int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); SBlockOrderSupporter sup = {0}; + clearDataBlockIterator(pBlockIter); + pBlockIter->numOfBlocks = numOfBlocks; - taosArrayClear(pBlockIter->blockList); // access data blocks according to the offset of each block in asc/desc order. int32_t numOfTables = taosArrayGetSize(pTableList); @@ -482,9 +518,9 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; for (int32_t k = 0; k < num; ++k) { - SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k); sup.pDataBlockInfo[sup.numOfTables][k] = - (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo}; + (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo}; cnt++; } @@ -499,20 +535,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 // since there is only one table qualified, blocks are not sorted if (sup.numOfTables == 1) { STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0); - if (pTableScanInfo->pBlockIdxList == NULL) { - pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx)); - } - for (int32_t i = 0; i < numOfBlocks; ++i) { - SFileDataBlockInfo blockInfo = {.tbBlockIdx = i}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); - recordToBlockInfo(&blockInfo, record, pReader); - - taosArrayPush(pBlockIter->blockList, &blockInfo); STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); } + taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList); pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList); int64_t et = taosGetTimestampUs(); @@ -540,18 +568,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3 int32_t pos = tMergeTreeGetChosenIndex(pTree); int32_t index = sup.indexPerTable[pos]++; - SFileDataBlockInfo blockInfo = {.tbBlockIdx = index}; - SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); - recordToBlockInfo(&blockInfo, record, pReader); + SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); + taosArrayPush(pBlockIter->blockList, pBlockInfo); - taosArrayPush(pBlockIter->blockList, &blockInfo); STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo; - if (pTableScanInfo->pBlockIdxList == NULL) { - size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList); - pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx)); - } - STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; + STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal}; taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx); + // set data block index overflow, in order to disable the offset comparator if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) { sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index bece22adad..0e7895c272 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -237,7 +237,6 @@ typedef struct SDataBlockIter { typedef struct SFileBlockDumpInfo { int32_t totalRows; int32_t rowIndex; -// int64_t lastKey; // STsdbRowKey lastKey; // this key should be removed bool allDumped; } SFileBlockDumpInfo; @@ -338,6 +337,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList, int32_t numOfTables); +void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record); void destroyLDataIter(SLDataIter* pIter); int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet); @@ -347,6 +347,11 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STab bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order); int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); +void clearRowKey(SRowKey* pKey); + +void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order); +void clearDataBlockIterator(SDataBlockIter* pIter); +void cleanupDataBlockIterator(SDataBlockIter* pIter); typedef struct { SArray* pTombData;