fix(tsdb): fix invalid read, and do some internal refactor.
This commit is contained in:
parent
e6f0697816
commit
79be7eea8c
|
@ -1331,6 +1331,11 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
|
||||
taosMemoryFreeClear(pBlock->info.pks[0].pData);
|
||||
taosMemoryFreeClear(pBlock->info.pks[1].pData);
|
||||
}
|
||||
|
||||
blockDataFreeRes(pBlock);
|
||||
taosMemoryFreeClear(pBlock);
|
||||
return NULL;
|
||||
|
|
|
@ -67,8 +67,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, SRowKey* pRowKey, TSDBROW* piRo
|
|||
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow);
|
||||
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SRowKey* pKey,
|
||||
STsdbReader* pReader);
|
||||
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||
STsdbReader* pReader);
|
||||
static int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||
|
||||
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
|
||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
|
||||
|
@ -392,7 +391,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
|
||||
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
|
||||
pIter->order = order;
|
||||
pIter->index = -1;
|
||||
pIter->numOfBlocks = 0;
|
||||
|
@ -403,8 +402,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
|
|||
}
|
||||
}
|
||||
|
||||
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
|
||||
|
||||
static void initReaderStatus(SReaderStatus* pStatus) {
|
||||
pStatus->pTableIter = NULL;
|
||||
pStatus->loadFromFile = true;
|
||||
|
@ -657,21 +654,19 @@ _end:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum,
|
||||
SArray* pTableScanInfoList) {
|
||||
size_t sizeInDisk = 0;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum,
|
||||
SArray* pTableScanInfoList) {
|
||||
int32_t k = 0;
|
||||
size_t sizeInDisk = 0;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
STimeWindow w = pReader->info.window;
|
||||
SBrinRecord* pRecord = NULL;
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
SBrinRecordIter iter = {0};
|
||||
|
||||
// clear info for the new file
|
||||
cleanupInfoForNextFileset(pReader->status.pTableMap);
|
||||
|
||||
int32_t k = 0;
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
STimeWindow w = pReader->info.window;
|
||||
SBrinRecord* pRecord = NULL;
|
||||
|
||||
SBrinRecordIter iter = {0};
|
||||
initBrinRecordIter(&iter, pReader->pFileReader, pIndexList);
|
||||
|
||||
while (((pRecord = getNextBrinRecord(&iter)) != NULL)) {
|
||||
|
@ -743,14 +738,27 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
}
|
||||
|
||||
if (pScanInfo->pBlockList == NULL) {
|
||||
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBrinRecord));
|
||||
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
|
||||
if (pScanInfo->pBlockList == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
void* p1 = taosArrayPush(pScanInfo->pBlockList, pRecord);
|
||||
if (pScanInfo->pBlockIdxList == NULL) {
|
||||
pScanInfo->pBlockIdxList = taosArrayInit(4, sizeof(STableDataBlockIdx));
|
||||
if (pScanInfo->pBlockIdxList == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
SFileDataBlockInfo blockInfo = {.tbBlockIdx = TARRAY_SIZE(pScanInfo->pBlockList)};
|
||||
recordToBlockInfo(&blockInfo, pRecord);
|
||||
void* p1 = taosArrayPush(pScanInfo->pBlockList, &blockInfo);
|
||||
if (p1 == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// todo: refactor to record the fileset skey/ekey
|
||||
if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) {
|
||||
pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts;
|
||||
}
|
||||
|
@ -1323,10 +1331,12 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
|
|||
}
|
||||
|
||||
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
|
||||
STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order,
|
||||
STableBlockScanInfo* pScanInfo, int32_t* nextIndex, int32_t order,
|
||||
SBrinRecord* pRecord) {
|
||||
bool asc = ASCENDING_TRAVERSE(order);
|
||||
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) {
|
||||
bool asc = ASCENDING_TRAVERSE(order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
|
||||
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pScanInfo->pBlockIdxList) - 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1334,9 +1344,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
|
|||
return false;
|
||||
}
|
||||
|
||||
int32_t step = asc ? 1 : -1;
|
||||
STableDataBlockIdx* pTableDataBlockIdx =
|
||||
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
||||
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
||||
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
|
||||
blockInfoToRecord(pRecord, p);
|
||||
|
||||
|
@ -1344,22 +1352,6 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
|
|||
return true;
|
||||
}
|
||||
|
||||
static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) {
|
||||
int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
|
||||
int32_t index = pBlockIter->index;
|
||||
|
||||
while (index < pBlockIter->numOfBlocks && index >= 0) {
|
||||
SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
|
||||
if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
|
||||
return index;
|
||||
}
|
||||
|
||||
index += step;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index,
|
||||
int32_t step) {
|
||||
if (index < 0 || index >= pBlockIter->numOfBlocks) {
|
||||
|
@ -2706,7 +2698,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
}
|
||||
|
||||
if (taosArrayGetSize(pIndexList) > 0 || pReader->status.pCurrentFileset->lvlArr->size > 0) {
|
||||
code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList);
|
||||
code = loadFileBlockBrinInfo(pReader, pIndexList, pBlockNum, pTableList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pIndexList);
|
||||
return code;
|
||||
|
@ -3154,23 +3146,14 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
|
|||
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
|
||||
|
||||
if (pBlockInfo) {
|
||||
// todo handle
|
||||
// STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
// if (pScanInfo) {
|
||||
// tsdbRowKeyAssign(&pDumpInfo->lastKey, &pScanInfo->lastProcKey);
|
||||
// lastKey = pScanInfo->lastProcKey;
|
||||
// }
|
||||
|
||||
pDumpInfo->totalRows = pBlockInfo->numRow;
|
||||
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1;
|
||||
} else {
|
||||
pDumpInfo->totalRows = 0;
|
||||
pDumpInfo->rowIndex = 0;
|
||||
// pDumpInfo->lastKey.key.ts = lastKey;
|
||||
}
|
||||
|
||||
pDumpInfo->allDumped = false;
|
||||
// pDumpInfo->lastKey = lastKey;
|
||||
}
|
||||
|
||||
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||
|
|
|
@ -167,6 +167,13 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void clearRowKey(SRowKey* pKey) {
|
||||
if (pKey == NULL || pKey->numOfPKs == 0 || (!IS_VAR_DATA_TYPE(pKey->pks[0].type))) {
|
||||
return;
|
||||
}
|
||||
taosMemoryFree(pKey->pks[0].pData);
|
||||
}
|
||||
|
||||
static void initLastProcKey(STableBlockScanInfo *pScanInfo, STsdbReader* pReader) {
|
||||
int32_t numOfPks = pReader->suppInfo.numOfPks;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
|
@ -293,6 +300,11 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
|
|||
p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList);
|
||||
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
|
||||
p->pFileDelData = taosArrayDestroy(p->pFileDelData);
|
||||
|
||||
clearRowKey(&p->lastProcKey);
|
||||
clearRowKey(&p->sttRange.skey);
|
||||
clearRowKey(&p->sttRange.ekey);
|
||||
clearRowKey(&p->sttKeyInfo.nextProcKey);
|
||||
}
|
||||
|
||||
void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
|
||||
|
@ -415,7 +427,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
|
|||
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
|
||||
}
|
||||
|
||||
static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record, STsdbReader* pReader) {
|
||||
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record) {
|
||||
pBlockInfo->uid = record->uid;
|
||||
pBlockInfo->firstKey = record->firstKey.key.ts;
|
||||
pBlockInfo->lastKey = record->lastKey.key.ts;
|
||||
|
@ -449,12 +461,36 @@ static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* recor
|
|||
}
|
||||
}
|
||||
|
||||
static void freeItem(void* pItem) {
|
||||
SFileDataBlockInfo* p = pItem;
|
||||
if (p->firstPKLen > 0) {
|
||||
taosMemoryFreeClear(p->firstPk.pData);
|
||||
}
|
||||
|
||||
if (p->lastPKLen > 0) {
|
||||
taosMemoryFreeClear(p->lastPk.pData);
|
||||
}
|
||||
}
|
||||
|
||||
void clearDataBlockIterator(SDataBlockIter* pIter) {
|
||||
pIter->index = -1;
|
||||
pIter->numOfBlocks = 0;
|
||||
taosArrayClearEx(pIter->blockList, freeItem);
|
||||
}
|
||||
|
||||
void cleanupDataBlockIterator(SDataBlockIter* pIter) {
|
||||
pIter->index = -1;
|
||||
pIter->numOfBlocks = 0;
|
||||
taosArrayDestroyEx(pIter->blockList, freeItem);
|
||||
}
|
||||
|
||||
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
|
||||
SBlockOrderSupporter sup = {0};
|
||||
clearDataBlockIterator(pBlockIter);
|
||||
|
||||
pBlockIter->numOfBlocks = numOfBlocks;
|
||||
taosArrayClear(pBlockIter->blockList);
|
||||
|
||||
// access data blocks according to the offset of each block in asc/desc order.
|
||||
int32_t numOfTables = taosArrayGetSize(pTableList);
|
||||
|
@ -482,9 +518,9 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
|
||||
|
||||
for (int32_t k = 0; k < num; ++k) {
|
||||
SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
sup.pDataBlockInfo[sup.numOfTables][k] =
|
||||
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo};
|
||||
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pBlockInfo->blockOffset, .pInfo = pTableScanInfo};
|
||||
cnt++;
|
||||
}
|
||||
|
||||
|
@ -499,20 +535,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
// since there is only one table qualified, blocks are not sorted
|
||||
if (sup.numOfTables == 1) {
|
||||
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0);
|
||||
if (pTableScanInfo->pBlockIdxList == NULL) {
|
||||
pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SFileDataBlockInfo blockInfo = {.tbBlockIdx = i};
|
||||
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
|
||||
recordToBlockInfo(&blockInfo, record, pReader);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
|
||||
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
|
||||
}
|
||||
|
||||
taosArrayAddAll(pBlockIter->blockList, pTableScanInfo->pBlockList);
|
||||
pTableScanInfo->pBlockList = taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
|
@ -540,18 +568,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
|||
int32_t pos = tMergeTreeGetChosenIndex(pTree);
|
||||
int32_t index = sup.indexPerTable[pos]++;
|
||||
|
||||
SFileDataBlockInfo blockInfo = {.tbBlockIdx = index};
|
||||
SBrinRecord* record = (SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
||||
recordToBlockInfo(&blockInfo, record, pReader);
|
||||
SFileDataBlockInfo* pBlockInfo = taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
||||
taosArrayPush(pBlockIter->blockList, pBlockInfo);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
STableBlockScanInfo* pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
|
||||
if (pTableScanInfo->pBlockIdxList == NULL) {
|
||||
size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList);
|
||||
pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx));
|
||||
}
|
||||
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
|
||||
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
|
||||
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
|
||||
|
||||
// set data block index overflow, in order to disable the offset comparator
|
||||
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
|
||||
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
|
||||
|
|
|
@ -237,7 +237,6 @@ typedef struct SDataBlockIter {
|
|||
typedef struct SFileBlockDumpInfo {
|
||||
int32_t totalRows;
|
||||
int32_t rowIndex;
|
||||
// int64_t lastKey;
|
||||
// STsdbRowKey lastKey; // this key should be removed
|
||||
bool allDumped;
|
||||
} SFileBlockDumpInfo;
|
||||
|
@ -338,6 +337,7 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead
|
|||
int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo,
|
||||
TStatisBlkArray* pStatisBlkArray, uint64_t suid, const uint64_t* pUidList,
|
||||
int32_t numOfTables);
|
||||
void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record);
|
||||
|
||||
void destroyLDataIter(SLDataIter* pIter);
|
||||
int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
|
||||
|
@ -347,6 +347,11 @@ bool isCleanSttBlock(SArray* pTimewindowList, STimeWindow* pQueryWindow, STab
|
|||
bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecord* pRecord, int32_t order);
|
||||
int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2);
|
||||
int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc);
|
||||
void clearRowKey(SRowKey* pKey);
|
||||
|
||||
void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order);
|
||||
void clearDataBlockIterator(SDataBlockIter* pIter);
|
||||
void cleanupDataBlockIterator(SDataBlockIter* pIter);
|
||||
|
||||
typedef struct {
|
||||
SArray* pTombData;
|
||||
|
|
Loading…
Reference in New Issue