enh(tsdb): support delete data read.
This commit is contained in:
parent
a33ce92e5d
commit
976dc322db
|
@ -629,9 +629,9 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint6
|
|||
pMTree->idStr = idStr;
|
||||
|
||||
if (!pMTree->backward) { // asc
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
|
||||
} else { // desc
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
|
||||
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
|
||||
}
|
||||
|
||||
pMTree->pLoadInfo = pBlockLoadInfo;
|
||||
|
@ -639,6 +639,11 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb* pTsdb, uint6
|
|||
pMTree->ignoreEarlierTs = false;
|
||||
|
||||
// todo handle other level of stt files, here only deal with the first level stt
|
||||
int32_t size = ((STFileSet*)pCurrentFileSet)->lvlArr[0].size;
|
||||
if (size == 0) {
|
||||
goto _end;
|
||||
}
|
||||
|
||||
SSttLvl* pSttLevel = ((STFileSet*)pCurrentFileSet)->lvlArr[0].data[0];
|
||||
ASSERT(pSttLevel->level == 0);
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "tsdbDataFileRW.h"
|
||||
#include "tsdbFS2.h"
|
||||
#include "tsdbMerge.h"
|
||||
#include "tsdbUtil2.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
|
@ -61,7 +62,8 @@ typedef struct STableBlockScanInfo {
|
|||
TSKEY lastKey;
|
||||
TSKEY lastKeyInStt; // last accessed key in stt
|
||||
SMapData mapData; // block info (compressed)
|
||||
SArray* pBlockList; // block data index list, SArray<SBlockIndex>
|
||||
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
||||
SArray* pDelData; // SArray<SDelData>
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
SArray* delSkyline; // delete info for this table
|
||||
|
@ -73,6 +75,7 @@ typedef struct STableBlockScanInfo {
|
|||
typedef struct SBlockOrderWrapper {
|
||||
int64_t uid;
|
||||
int64_t offset;
|
||||
STableBlockScanInfo *pInfo;
|
||||
} SBlockOrderWrapper;
|
||||
|
||||
typedef struct SBlockOrderSupporter {
|
||||
|
@ -123,8 +126,7 @@ typedef struct SLastBlockReader {
|
|||
typedef struct SFilesetIter {
|
||||
int32_t numOfFiles; // number of total files
|
||||
int32_t index; // current accessed index in the list
|
||||
TFileSetArray* pFilesetArray;// data file set list
|
||||
// SArray* pFileList; // data file list
|
||||
TFileSetArray* pFilesetList;// data file set list
|
||||
int32_t order;
|
||||
SLastBlockReader* pLastBlockReader; // last file block reader
|
||||
} SFilesetIter;
|
||||
|
@ -133,6 +135,7 @@ typedef struct SFileDataBlockInfo {
|
|||
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
|
||||
uint64_t uid;
|
||||
int32_t tbBlockIdx;
|
||||
SBrinRecord record;
|
||||
} SFileDataBlockInfo;
|
||||
|
||||
typedef struct SDataBlockIter {
|
||||
|
@ -217,8 +220,8 @@ struct STsdbReader {
|
|||
SHashObj** pIgnoreTables;
|
||||
STSchema* pSchema; // the newest version schema
|
||||
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
|
||||
SDataFileReader* pFileReader; // the file reader
|
||||
SDelFReader* pDelFReader; // the del file reader
|
||||
SDataFileReader* pFileReader; // the file reader
|
||||
SDelFReader* pDelFReader; // the del file reader, todo remove it
|
||||
SArray* pDelIdx; // del file block index;
|
||||
SBlockInfoBuf blockInfoBuf;
|
||||
EContentData step;
|
||||
|
@ -536,11 +539,11 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
|
|||
|
||||
// init file iterator
|
||||
static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetArray, STsdbReader* pReader) {
|
||||
size_t numOfFileset = pFileSetArray->size;
|
||||
size_t numOfFileset = TARRAY2_SIZE(pFileSetArray);
|
||||
|
||||
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
|
||||
pIter->order = pReader->order;
|
||||
pIter->pFilesetArray = pFileSetArray;
|
||||
pIter->pFilesetList = pFileSetArray;
|
||||
pIter->numOfFiles = numOfFileset;
|
||||
|
||||
if (pIter->pLastBlockReader == NULL) {
|
||||
|
@ -602,8 +605,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
|
|||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
// pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFilesetArray, pIter->index);
|
||||
pReader->status.pCurrentFileset = pIter->pFilesetArray->data[pIter->index];
|
||||
pReader->status.pCurrentFileset = pIter->pFilesetList->data[pIter->index];
|
||||
|
||||
STFileObj** pFileObj = pReader->status.pCurrentFileset->farr;
|
||||
if (pFileObj[0] != NULL) {
|
||||
|
@ -894,11 +896,17 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
|||
int32_t i = 0, j = 0;
|
||||
while (i < pBlkArray->size && j < numOfTables) {
|
||||
pBrinBlk = &pBlkArray->data[i];
|
||||
if (pBrinBlk->minTbid.suid != pReader->suid) {
|
||||
if (pBrinBlk->minTbid.suid > pReader->suid) { // not include the queried table/super table, quit the loop
|
||||
break;
|
||||
}
|
||||
|
||||
if (pBrinBlk->maxTbid.suid < pReader->suid) {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(pBrinBlk->minTbid.suid >= pReader->suid && pBrinBlk->maxTbid.suid <= pReader->suid);
|
||||
|
||||
if (pBrinBlk->minTbid.uid < pList->tableUidList[j]) {
|
||||
i += 1;
|
||||
continue;
|
||||
|
@ -909,6 +917,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
|||
continue;
|
||||
}
|
||||
|
||||
// todo maxTbid.uid == xxx?
|
||||
if (pBrinBlk->minTbid.uid == pList->tableUidList[j]) {
|
||||
// this block belongs to a table that is not queried.
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBrinBlk->minTbid.uid, pReader->idStr);
|
||||
|
@ -929,7 +938,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
|
|||
}
|
||||
|
||||
int64_t et2 = taosGetTimestampUs();
|
||||
tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s",
|
||||
tsdbDebug("load block index for %d/%d tables completed, elapsed time:%.2f ms, set BrinBlk:%.2f ms, size:%.2f Kb %s",
|
||||
numOfTables, (int32_t)pBlkArray->size, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, pBlkArray->size * sizeof(SBrinBlk) / 1024.0,
|
||||
pReader->idStr);
|
||||
|
||||
|
@ -969,20 +978,18 @@ static void cleanupTableScanInfo(SReaderStatus* pStatus) {
|
|||
|
||||
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) {
|
||||
size_t sizeInDisk = 0;
|
||||
size_t numOfTables = taosArrayGetSize(pIndexList);
|
||||
size_t numOfBlocks = taosArrayGetSize(pIndexList);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
cleanupTableScanInfo(&pReader->status);
|
||||
|
||||
// set the flag for the new file
|
||||
// pReader->status.mapDataCleaned = false;
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
SBrinBlk* pBlk = taosArrayGet(pIndexList, i);
|
||||
int32_t i = 0, k = 0;
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlk->minTbid.uid, pReader->idStr);
|
||||
if (pScanInfo == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
while(i < numOfBlocks && k < numOfTables) {
|
||||
SBrinBlk* pBlk = taosArrayGet(pIndexList, i);
|
||||
uint64_t uid = pReader->status.uidList.tableUidList[k];
|
||||
|
||||
SBrinBlock block = {0};
|
||||
tsdbDataFileReadBrinBlock(pReader->pFileReader, pBlk, &block);
|
||||
|
@ -991,27 +998,48 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
// tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
||||
// taosArrayEnsureCap(pScanInfo->pBlockList, pScanInfo->mapData.nItem);
|
||||
|
||||
// todo set the correct size
|
||||
sizeInDisk += 0;//pScanInfo->mapData.nData;
|
||||
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
||||
STimeWindow w = pReader->window;
|
||||
if (isEmptyQueryTimeWindow(&w)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||
w.skey = pScanInfo->lastKey + step;
|
||||
} else {
|
||||
w.ekey = pScanInfo->lastKey + step;
|
||||
}
|
||||
|
||||
if (isEmptyQueryTimeWindow(&w)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < block.numRow->size; ++j) {
|
||||
SBrinRecord record = {0};
|
||||
SBrinRecord record = {0};
|
||||
for (int32_t j = 0; j < TARRAY2_SIZE(block.numRow); ++j) {
|
||||
tBrinBlockGet(&block, j, &record);
|
||||
{
|
||||
while (record.uid > uid) {
|
||||
k += 1;
|
||||
|
||||
uid = pReader->status.uidList.tableUidList[k];
|
||||
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
|
||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||
w.skey = pScanInfo->lastKey + step;
|
||||
} else {
|
||||
w.ekey = pScanInfo->lastKey + step;
|
||||
}
|
||||
}
|
||||
|
||||
if (record.uid < uid) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(record.suid == pReader->suid);
|
||||
|
||||
// 1. time range check
|
||||
if (record.firstKey > w.ekey || record.lastKey < w.skey) {
|
||||
// if (block.minKey.ts > w.ekey || block.maxKey.ts < w.skey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1022,7 +1050,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
|
||||
// SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = record.blockOffset};
|
||||
// bIndex.window = (STimeWindow){.skey = record.firstKey, .ekey = record.lastKey};
|
||||
|
||||
void* p1 = taosArrayPush(pScanInfo->pBlockList, &record);
|
||||
if (p1 == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1031,7 +1058,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
pBlockNum->numOfBlocks += 1;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
|
||||
i += 1;
|
||||
|
||||
STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList);
|
||||
if ((p == NULL || (*p)->uid != uid) && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
|
||||
taosArrayPush(pTableScanInfoList, &pScanInfo);
|
||||
}
|
||||
}
|
||||
|
@ -1144,22 +1174,22 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SDataBlk* pBlock, int32_t pos) {
|
||||
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SBrinRecord* pRecord, int32_t pos) {
|
||||
// NOTE: reverse the order to find the end position in data block
|
||||
int32_t endPos = -1;
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||
|
||||
if (asc && pReader->window.ekey >= pBlock->maxKey.ts) {
|
||||
endPos = pBlock->nRow - 1;
|
||||
} else if (!asc && pReader->window.skey <= pBlock->minKey.ts) {
|
||||
if (asc && pReader->window.ekey >= pRecord->lastKey) {
|
||||
endPos = pRecord->numRow - 1;
|
||||
} else if (!asc && pReader->window.skey <= pRecord->firstKey) {
|
||||
endPos = 0;
|
||||
} else {
|
||||
int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
|
||||
endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order);
|
||||
endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->order);
|
||||
}
|
||||
|
||||
if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)||
|
||||
(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) {
|
||||
if ((pReader->verRange.maxVer >= pRecord->firstKeyVer && pReader->verRange.maxVer < pRecord->lastKeyVer)||
|
||||
(pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.minVer > pRecord->firstKeyVer)) {
|
||||
int32_t i = endPos;
|
||||
|
||||
if (asc) {
|
||||
|
@ -1169,7 +1199,7 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
|
|||
}
|
||||
}
|
||||
} else {
|
||||
for(; i < pBlock->nRow; ++i) {
|
||||
for(; i < pRecord->numRow; ++i) {
|
||||
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
|
||||
break;
|
||||
}
|
||||
|
@ -1300,7 +1330,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
|
||||
SBlockData* pBlockData = &pStatus->fileBlockData;
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||
// SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||
int32_t numOfOutputCols = pSupInfo->numOfCols;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1310,6 +1340,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
bool asc = ASCENDING_TRAVERSE(pReader->order);
|
||||
int32_t step = asc ? 1 : -1;
|
||||
|
||||
SBrinRecord* pRecord = &pBlockInfo->record;
|
||||
|
||||
// no data exists, return directly.
|
||||
if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
|
||||
tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader,
|
||||
|
@ -1319,34 +1351,34 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
// row index of dump info remain the initial position, let's find the appropriate start position.
|
||||
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
|
||||
if (asc && pReader->window.skey <= pBlock->minKey.ts && pReader->verRange.minVer <= pBlock->minVer) {
|
||||
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) {
|
||||
if (asc && pReader->window.skey <= pRecord->firstKey && pReader->verRange.minVer <= pRecord->firstKeyVer) {
|
||||
// pDumpInfo->rowIndex = 0;
|
||||
} else if (!asc && pReader->window.ekey >= pBlock->maxKey.ts && pReader->verRange.maxVer >= pBlock->maxVer) {
|
||||
// pDumpInfo->rowIndex = pBlock->nRow - 1;
|
||||
} else if (!asc && pReader->window.ekey >= pRecord->lastKey && pReader->verRange.maxVer >= pRecord->lastKeyVer) {
|
||||
// pDumpInfo->rowIndex = pRecord->numRow - 1;
|
||||
} else { // find the appropriate the start position in current block, and set it to be the current rowIndex
|
||||
int32_t pos = asc ? pBlock->nRow - 1 : 0;
|
||||
int32_t pos = asc ? pRecord->numRow - 1 : 0;
|
||||
int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
||||
int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
|
||||
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order);
|
||||
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, order);
|
||||
|
||||
if (pDumpInfo->rowIndex < 0) {
|
||||
tsdbError(
|
||||
"%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
|
||||
"-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->minVer,
|
||||
pBlock->maxVer, pReader->idStr);
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->firstKeyVer,
|
||||
pRecord->lastKeyVer, pReader->idStr);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
ASSERT(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.maxVer >= pBlock->minVer);
|
||||
ASSERT(pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.maxVer >= pRecord->firstKeyVer);
|
||||
|
||||
// find the appropriate start position that satisfies the version requirement.
|
||||
if ((pReader->verRange.maxVer >= pBlock->minVer && pReader->verRange.maxVer < pBlock->maxVer)||
|
||||
(pReader->verRange.minVer <= pBlock->maxVer && pReader->verRange.minVer > pBlock->minVer)) {
|
||||
if ((pReader->verRange.maxVer >= pRecord->firstKeyVer && pReader->verRange.maxVer < pRecord->lastKeyVer)||
|
||||
(pReader->verRange.minVer <= pRecord->lastKeyVer && pReader->verRange.minVer > pRecord->firstKeyVer)) {
|
||||
int32_t i = pDumpInfo->rowIndex;
|
||||
if (asc) {
|
||||
for(; i < pBlock->nRow; ++i) {
|
||||
for(; i < pRecord->numRow; ++i) {
|
||||
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
|
||||
break;
|
||||
}
|
||||
|
@ -1365,7 +1397,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
// time window check
|
||||
int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pBlock, pDumpInfo->rowIndex);
|
||||
int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pRecord, pDumpInfo->rowIndex);
|
||||
if (endIndex == -1) {
|
||||
setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1437,24 +1469,24 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
|||
pDumpInfo->rowIndex += step * dumpedRows;
|
||||
|
||||
// check if current block are all handled
|
||||
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pBlock->nRow) {
|
||||
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
|
||||
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
|
||||
if (outOfTimeWindow(ts, &pReader->window)) { // the remain data has out of query time window, ignore current block
|
||||
setBlockAllDumped(pDumpInfo, ts, pReader->order);
|
||||
}
|
||||
} else {
|
||||
int64_t ts = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
|
||||
int64_t ts = asc ? pRecord->lastKey : pRecord->firstKey;
|
||||
setBlockAllDumped(pDumpInfo, ts, pReader->order);
|
||||
}
|
||||
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
|
||||
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||
int32_t unDumpedRows = asc ? pRecord->numRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
|
||||
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
|
||||
unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, dumpedRows,
|
||||
unDumpedRows, pRecord->firstKeyVer, pRecord->lastKeyVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1505,22 +1537,23 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
|
|||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||
|
||||
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||
// code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
|
||||
// SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||
code = tsdbDataFileReadBlockData(pReader->pFileReader, &pBlockInfo->record, pBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, code:%s %s",
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
|
||||
tstrerror(code), pReader->idStr);
|
||||
", rows:%d, code:%s %s",
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.firstKey,
|
||||
pBlockInfo->record.lastKey, pBlockInfo->record.numRow, tstrerror(code), pReader->idStr);
|
||||
return code;
|
||||
}
|
||||
|
||||
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
SBrinRecord* pRecord = &pBlockInfo->record;
|
||||
tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
|
||||
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
|
||||
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
|
||||
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
|
||||
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->numRow,
|
||||
pRecord->firstKeyVer, pRecord->lastKeyVer, elapsedTime, pReader->idStr);
|
||||
|
||||
pReader->cost.blockLoadTime += elapsedTime;
|
||||
pDumpInfo->allDumped = false;
|
||||
|
@ -1603,6 +1636,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
SBlockOrderSupporter sup = {0};
|
||||
pBlockIter->numOfBlocks = numOfBlocks;
|
||||
taosArrayClear(pBlockIter->blockList);
|
||||
|
||||
pBlockIter->pTableMap = pReader->status.pTableMap;
|
||||
|
||||
// access data blocks according to the offset of each block in asc/desc order.
|
||||
|
@ -1632,9 +1666,9 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
|
||||
|
||||
for (int32_t k = 0; k < num; ++k) {
|
||||
SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
sup.pDataBlockInfo[sup.numOfTables][k] =
|
||||
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset};
|
||||
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo};
|
||||
cnt++;
|
||||
}
|
||||
|
||||
|
@ -1650,6 +1684,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
|||
if (sup.numOfTables == 1) {
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
|
||||
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
}
|
||||
|
||||
|
@ -1720,15 +1756,15 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
|
|||
/**
|
||||
* This is an two rectangles overlap cases.
|
||||
*/
|
||||
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
|
||||
return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
|
||||
(pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
|
||||
(pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
|
||||
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
|
||||
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SFileDataBlockInfo* pBlock) {
|
||||
return (pWindow->ekey < pBlock->record.lastKey && pWindow->ekey >= pBlock->record.firstKey) ||
|
||||
(pWindow->skey > pBlock->record.firstKey && pWindow->skey <= pBlock->record.lastKey) ||
|
||||
(pVerRange->minVer > pBlock->record.firstKeyVer && pVerRange->minVer <= pBlock->record.lastKeyVer) ||
|
||||
(pVerRange->maxVer < pBlock->record.lastKeyVer && pVerRange->maxVer >= pBlock->record.firstKeyVer);
|
||||
}
|
||||
|
||||
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
|
||||
int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) {
|
||||
int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) {
|
||||
bool asc = ASCENDING_TRAVERSE(order);
|
||||
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
|
||||
return false;
|
||||
|
@ -1739,8 +1775,11 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
|
|||
}
|
||||
|
||||
int32_t step = asc ? 1 : -1;
|
||||
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||
*pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||
// *nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||
SBrinRecord* p = taosArrayGet(pTableBlockScanInfo->pBlockList, pBlockInfo->tbBlockIdx + step);
|
||||
memcpy(pRecord, p, sizeof(SBrinRecord));
|
||||
|
||||
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
|
||||
return true;
|
||||
}
|
||||
|
@ -1791,16 +1830,25 @@ static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlo
|
|||
}
|
||||
}
|
||||
|
||||
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
|
||||
bool ascScan = ASCENDING_TRAVERSE(order);
|
||||
|
||||
return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
|
||||
(!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
|
||||
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) {
|
||||
// it is the last block in current file, no chance to overlap with neighbor blocks.
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
return pBlock->record.lastKey == pRec->firstKey;
|
||||
} else {
|
||||
return pBlock->record.firstKey == pRec->lastKey;
|
||||
}
|
||||
}
|
||||
|
||||
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
|
||||
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
|
||||
(pBlock->minVer <= pVerRange->maxVer);
|
||||
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SFileDataBlockInfo* pBlock) {
|
||||
bool ascScan = ASCENDING_TRAVERSE(order);
|
||||
|
||||
return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->record.firstKey)) ||
|
||||
(!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->record.lastKey));
|
||||
}
|
||||
|
||||
static bool keyOverlapFileBlock(TSDBKEY key, SFileDataBlockInfo* pBlock, SVersionRange* pVerRange) {
|
||||
return (key.ts >= pBlock->record.firstKey && key.ts <= pBlock->record.lastKey) &&
|
||||
(pBlock->record.lastKeyVer >= pVerRange->minVer) && (pBlock->record.firstKeyVer <= pVerRange->maxVer);
|
||||
}
|
||||
|
||||
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock,
|
||||
|
@ -1875,31 +1923,33 @@ typedef struct {
|
|||
bool moreThanCapcity;
|
||||
} SDataBlockToLoadInfo;
|
||||
|
||||
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
|
||||
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
|
||||
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
|
||||
STsdbReader* pReader) {
|
||||
int32_t neighborIndex = 0;
|
||||
SBlockIndex bIndex = {0};
|
||||
SBrinRecord rec = {0};
|
||||
|
||||
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex);
|
||||
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &rec);
|
||||
|
||||
// overlap with neighbor
|
||||
if (hasNeighbor) {
|
||||
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
|
||||
// pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order);
|
||||
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order);
|
||||
}
|
||||
|
||||
// todo:
|
||||
// has duplicated ts of different version in this block
|
||||
pInfo->hasDupTs = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
|
||||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
|
||||
pInfo->hasDupTs = 0;//(pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
|
||||
pInfo->overlapWithDelInfo = false;//overlapWithDelSkyline(pScanInfo, pBlockInfo, pReader->order);
|
||||
|
||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
|
||||
pInfo->overlapWithLastBlock = !(pBlockInfo->record.lastKey < tsLast || pBlockInfo->record.firstKey > tsLast);
|
||||
}
|
||||
|
||||
pInfo->moreThanCapcity = pBlock->nRow > pReader->resBlockInfo.capacity;
|
||||
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
|
||||
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
|
||||
pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity;
|
||||
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlockInfo);
|
||||
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlockInfo, &pReader->verRange);
|
||||
}
|
||||
|
||||
// 1. the version of all rows should be less than the endVersion
|
||||
|
@ -1908,10 +1958,10 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
// 4. output buffer should be large enough to hold all rows in current block
|
||||
// 5. delete info should not overlap with current block data
|
||||
// 6. current block should not contain the duplicated ts
|
||||
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
|
||||
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo,
|
||||
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
|
||||
SDataBlockToLoadInfo info = {0};
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||
|
||||
bool loadDataBlock =
|
||||
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
|
||||
|
@ -1933,7 +1983,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
|
|||
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock,
|
||||
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
|
||||
SDataBlockToLoadInfo info = {0};
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
|
||||
info.overlapWithDelInfo || info.overlapWithLastBlock);
|
||||
return isCleanFileBlock;
|
||||
|
@ -2594,13 +2644,41 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScanInfo* pBlockScanInfo, TSDBKEY* pKey,
|
||||
SMemTable* pMem, STbDataIter** pIter, const char* type) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
|
||||
|
||||
if (pMem != NULL) {
|
||||
*pData = tsdbGetTbDataFromMemTable(pMem, pReader->suid, pBlockScanInfo->uid);
|
||||
|
||||
if ((*pData) != NULL) {
|
||||
code = tsdbTbDataIterCreate((*pData), pKey, backward, pIter);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(*pIter) != NULL);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64 ", check data in %s from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||
"-%" PRId64 " %s",
|
||||
pReader, pBlockScanInfo->uid, type, pKey->ts, pReader->order, (*pData)->minKey, (*pData)->maxKey,
|
||||
pReader->idStr);
|
||||
} else {
|
||||
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for %s, code:%s, %s", pReader, pBlockScanInfo->uid,
|
||||
type, tstrerror(code), pReader->idStr);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tsdbDebug("%p uid:%" PRIu64 ", no data in %s, %s", pReader, pBlockScanInfo->uid, type, pReader->idStr);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||
if (pBlockScanInfo->iterInit) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
TSDBKEY startKey = {0};
|
||||
if (ASCENDING_TRAVERSE(pReader->order)) {
|
||||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
|
||||
|
@ -2608,52 +2686,22 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
|||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
|
||||
}
|
||||
|
||||
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
|
||||
int64_t st = 0;
|
||||
|
||||
STbData* d = NULL;
|
||||
if (pReader->pReadSnap->pMem != NULL) {
|
||||
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
|
||||
if (d != NULL) {
|
||||
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||
"-%" PRId64 " %s",
|
||||
pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr);
|
||||
} else {
|
||||
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid,
|
||||
tstrerror(code), pReader->idStr);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tsdbDebug("%p uid:%" PRIu64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
|
||||
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem,
|
||||
&pBlockScanInfo->iter.iter, "mem");
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STbData* di = NULL;
|
||||
if (pReader->pReadSnap->pIMem != NULL) {
|
||||
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
|
||||
if (di != NULL) {
|
||||
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||
"-%" PRId64 " %s",
|
||||
pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr);
|
||||
} else {
|
||||
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid,
|
||||
tstrerror(code), pReader->idStr);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tsdbDebug("%p uid:%" PRIu64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
|
||||
code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem,
|
||||
&pBlockScanInfo->iiter.iter, "imem");
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
st = taosGetTimestampUs();
|
||||
int64_t st = taosGetTimestampUs();
|
||||
initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
|
||||
pReader->cost.initDelSkylineIterTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||
|
||||
|
@ -2815,17 +2863,18 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
|
||||
int32_t nextIndex = -1;
|
||||
SBlockIndex nxtBIndex = {0};
|
||||
// SBlockIndex nxtBIndex = {0};
|
||||
|
||||
*loadNeighbor = false;
|
||||
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
|
||||
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &nxtBIndex);
|
||||
SBrinRecord rec = {0};
|
||||
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &rec);
|
||||
if (!hasNeighbor) { // do nothing
|
||||
return code;
|
||||
}
|
||||
|
||||
if (overlapWithNeighborBlock(pBlock, &nxtBIndex, pReader->order)) { // load next block
|
||||
if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order)) { // load next block
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
||||
|
@ -2997,19 +3046,9 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
|
||||
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
|
||||
if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) {
|
||||
SDelIdx idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
|
||||
SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
|
||||
|
||||
if (pIdx != NULL) {
|
||||
code = tsdbReadDelDatav1(pReader->pDelFReader, pIdx, pDelData, pReader->verRange.maxVer);
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
if (pBlockScanInfo->pDelData == NULL) {
|
||||
pBlockScanInfo->pDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
||||
SDelData* p = NULL;
|
||||
|
@ -3017,7 +3056,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
p = pMemTbData->pHead;
|
||||
while (p) {
|
||||
if (p->version <= pReader->verRange.maxVer) {
|
||||
taosArrayPush(pDelData, p);
|
||||
taosArrayPush(pBlockScanInfo->pDelData, p);
|
||||
}
|
||||
|
||||
p = p->pNext;
|
||||
|
@ -3028,18 +3067,19 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
p = piMemTbData->pHead;
|
||||
while (p) {
|
||||
if (p->version <= pReader->verRange.maxVer) {
|
||||
taosArrayPush(pDelData, p);
|
||||
taosArrayPush(pBlockScanInfo->pDelData, p);
|
||||
}
|
||||
p = p->pNext;
|
||||
}
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pDelData) > 0) {
|
||||
int32_t numOfElems = taosArrayGetSize(pBlockScanInfo->pDelData);
|
||||
if (numOfElems > 0) {
|
||||
pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
|
||||
code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
|
||||
code = tsdbBuildDeleteSkyline(pBlockScanInfo->pDelData, 0, numOfElems - 1, pBlockScanInfo->delSkyline);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pDelData);
|
||||
pBlockScanInfo->pDelData = taosArrayDestroy(pBlockScanInfo->pDelData);
|
||||
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);
|
||||
|
||||
pBlockScanInfo->iter.index = index;
|
||||
|
@ -3049,9 +3089,9 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
taosArrayDestroy(pDelData);
|
||||
return code;
|
||||
// _err:
|
||||
// taosArrayDestroy(pBlockScanInfo->pDelData);
|
||||
// return code;
|
||||
}
|
||||
|
||||
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||
|
@ -3140,25 +3180,51 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
|
||||
taosArrayDestroy(pIndexList);
|
||||
|
||||
if (pReader->pReadSnap != NULL) {
|
||||
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
|
||||
if (pReader->pDelFReader == NULL && pDelFile != NULL) {
|
||||
int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
STFileObj* pTombFileObj = pReader->status.pCurrentFileset->farr[3];
|
||||
if (pTombFileObj!= NULL) {
|
||||
const TTombBlkArray* pBlkArray = NULL;
|
||||
|
||||
pReader->pDelIdx = taosArrayInit(4, sizeof(SDelIdx));
|
||||
if (pReader->pDelIdx == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
|
||||
int32_t i = 0, j = 0;
|
||||
|
||||
// todo find the correct start position.
|
||||
|
||||
while (i < pBlkArray->size && j < numOfTables) {
|
||||
STombBlock block = {0};
|
||||
code = tsdbDataFileReadTombBlock(pReader->pFileReader, &pBlkArray->data[i], &block);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
uint64_t uid = pReader->status.uidList.tableUidList[j];
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
|
||||
STombRecord record = {0};
|
||||
for(int32_t k = 0; k < block.suid->size; ++k) {
|
||||
code = tTombBlockGet(&block, k, &record);
|
||||
|
||||
{
|
||||
while(record.uid > uid) {
|
||||
j += 1;
|
||||
uid = pReader->status.uidList.tableUidList[j];
|
||||
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
}
|
||||
|
||||
if (record.uid < uid) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(record.suid == pReader->suid);
|
||||
|
||||
if (record.version <= pReader->verRange.maxVer) {
|
||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||
taosArrayPush(pScanInfo->pDelData, &delData);
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
|
||||
code = tsdbReadDelIdx(pReader->pDelFReader, pReader->pDelIdx);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroy(pReader->pDelIdx);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3278,10 +3344,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
STableBlockScanInfo* pScanInfo = NULL;
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||
|
||||
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
|
||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
|
||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->order);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -3294,11 +3359,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
|
||||
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
||||
|
||||
if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) {
|
||||
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) {
|
||||
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -3306,16 +3370,16 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
|
||||
// build composed data block
|
||||
code = buildComposedDataBlock(pReader);
|
||||
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlock)) {
|
||||
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlockInfo)) {
|
||||
// data in memory that are earlier than current file block
|
||||
// rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlock->minKey.ts : pBlock->maxKey.ts;
|
||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
|
||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||
} else {
|
||||
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
|
||||
// only return the rows in last block
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
ASSERT(tsLast >= pBlock->maxKey.ts);
|
||||
ASSERT(tsLast >= pBlockInfo->record.lastKey);
|
||||
|
||||
SBlockData* pBData = &pReader->status.fileBlockData;
|
||||
tBlockDataReset(pBData);
|
||||
|
@ -3354,20 +3418,20 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
} else { // whole block is required, return it directly
|
||||
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||
pInfo->rows = pBlock->nRow;
|
||||
pInfo->rows = pBlockInfo->record.numRow;
|
||||
pInfo->id.uid = pScanInfo->uid;
|
||||
pInfo->dataLoad = 0;
|
||||
pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
|
||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
|
||||
setComposedBlockFlag(pReader, false);
|
||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
|
||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->order);
|
||||
|
||||
// update the last key for the corresponding table
|
||||
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
|
||||
tsdbDebug("%p uid:%" PRIu64
|
||||
" clean file block retrieved from file, global index:%d, "
|
||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
|
||||
pBlock->maxKey.ts, pReader->idStr);
|
||||
" clean file block retrieved from file, global index:%d, "
|
||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
|
||||
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3405,7 +3469,6 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
|
|||
|
||||
STableBlockScanInfo* pScanInfo = *p;
|
||||
tMapDataReset(&pScanInfo->mapData);
|
||||
// tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
|
||||
|
||||
SDataBlk block = {0};
|
||||
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
|
||||
|
|
Loading…
Reference in New Issue