Merge pull request #22415 from taosdata/szhou/tsdb-neigh-block
tsdb neighbouring block search and set it to global block iterator
This commit is contained in:
commit
0c599fa869
|
@ -1102,10 +1102,10 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
|
||||||
(pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer);
|
(pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
|
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
|
||||||
int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) {
|
int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) {
|
||||||
bool asc = ASCENDING_TRAVERSE(order);
|
bool asc = ASCENDING_TRAVERSE(order);
|
||||||
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
|
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1116,7 +1116,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
|
||||||
int32_t step = asc ? 1 : -1;
|
int32_t step = asc ? 1 : -1;
|
||||||
// *nextIndex = pBlockInfo->tbBlockIdx + step;
|
// *nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||||
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
|
||||||
SBrinRecord* p = taosArrayGet(pTableBlockScanInfo->pBlockList, pBlockInfo->tbBlockIdx + step);
|
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
|
||||||
|
SBrinRecord* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
|
||||||
memcpy(pRecord, p, sizeof(SBrinRecord));
|
memcpy(pRecord, p, sizeof(SBrinRecord));
|
||||||
|
|
||||||
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
*nextIndex = pBlockInfo->tbBlockIdx + step;
|
||||||
|
@ -1141,7 +1142,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
|
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
|
||||||
if (index < 0 || index >= pBlockIter->numOfBlocks) {
|
if (index < 0 || index >= pBlockIter->numOfBlocks) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1149,12 +1150,34 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
|
||||||
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
|
SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index);
|
||||||
pBlockIter->index += step;
|
pBlockIter->index += step;
|
||||||
|
|
||||||
if (index != pBlockIter->index) {
|
if (index != pBlockIter->index) {
|
||||||
taosArrayRemove(pBlockIter->blockList, index);
|
if (index > pBlockIter->index) {
|
||||||
taosArrayInsert(pBlockIter->blockList, pBlockIter->index, &fblock);
|
for (int32_t i = index - 1; i >= pBlockIter->index; --i) {
|
||||||
|
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
|
||||||
|
|
||||||
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
|
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||||
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
|
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
|
||||||
|
pTableDataBlockIdx->globalIndex = i + 1;
|
||||||
|
|
||||||
|
taosArraySet(pBlockIter->blockList, i + 1, pBlockInfo);
|
||||||
|
}
|
||||||
|
} else if (index < pBlockIter->index) {
|
||||||
|
for (int32_t i = index + 1; i <= pBlockIter->index; ++i) {
|
||||||
|
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
|
||||||
|
|
||||||
|
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||||
|
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
|
||||||
|
pTableDataBlockIdx->globalIndex = i - 1;
|
||||||
|
|
||||||
|
taosArraySet(pBlockIter->blockList, i - 1, pBlockInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock);
|
||||||
|
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, fblock.uid, pReader->idStr);
|
||||||
|
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, fblock.tbBlockIdx);
|
||||||
|
pTableDataBlockIdx->globalIndex = pBlockIter->index;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1260,7 +1283,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
||||||
int32_t neighborIndex = 0;
|
int32_t neighborIndex = 0;
|
||||||
SBrinRecord rec = {0};
|
SBrinRecord rec = {0};
|
||||||
|
|
||||||
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec);
|
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec);
|
||||||
|
|
||||||
// overlap with neighbor
|
// overlap with neighbor
|
||||||
if (hasNeighbor) {
|
if (hasNeighbor) {
|
||||||
|
@ -2232,7 +2255,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
|
||||||
*loadNeighbor = false;
|
*loadNeighbor = false;
|
||||||
|
|
||||||
SBrinRecord rec = {0};
|
SBrinRecord rec = {0};
|
||||||
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec);
|
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec);
|
||||||
if (!hasNeighbor) { // do nothing
|
if (!hasNeighbor) { // do nothing
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2242,11 +2265,11 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
|
||||||
// 1. find the next neighbor block in the scan block list
|
// 1. find the next neighbor block in the scan block list
|
||||||
SFileDataBlockInfo fb = {.uid = pBlockInfo->uid, .tbBlockIdx = nextIndex};
|
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
|
||||||
int32_t neighborIndex = findFileBlockInfoIndex(pBlockIter, &fb);
|
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
|
||||||
|
|
||||||
// 2. remove it from the scan block list
|
// 2. remove it from the scan block list
|
||||||
setFileBlockActiveInBlockIter(pBlockIter, neighborIndex, step);
|
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);
|
||||||
|
|
||||||
// 3. load the neighbor block, and set it to be the currently accessed file data block
|
// 3. load the neighbor block, and set it to be the currently accessed file data block
|
||||||
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
|
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid);
|
||||||
|
@ -4178,6 +4201,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
|
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
|
||||||
|
pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList);
|
||||||
// TODO: keep skyline for reuse
|
// TODO: keep skyline for reuse
|
||||||
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
|
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,6 +220,7 @@ void clearBlockScanInfo(STableBlockScanInfo* p) {
|
||||||
|
|
||||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||||
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
||||||
|
p->pBlockIdxList = taosArrayDestroy(p->pBlockIdxList);
|
||||||
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
|
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
|
||||||
p->pFileDelData = taosArrayDestroy(p->pFileDelData);
|
p->pFileDelData = taosArrayDestroy(p->pFileDelData);
|
||||||
}
|
}
|
||||||
|
@ -238,6 +239,7 @@ void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
|
||||||
static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
||||||
// reset the index in last block when handing a new file
|
// reset the index in last block when handing a new file
|
||||||
taosArrayClear(pScanInfo->pBlockList);
|
taosArrayClear(pScanInfo->pBlockList);
|
||||||
|
taosArrayClear(pScanInfo->pBlockIdxList);
|
||||||
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -384,12 +386,21 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
||||||
|
|
||||||
// since there is only one table qualified, blocks are not sorted
|
// since there is only one table qualified, blocks are not sorted
|
||||||
if (sup.numOfTables == 1) {
|
if (sup.numOfTables == 1) {
|
||||||
|
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, 0);
|
||||||
|
if (pTableScanInfo->pBlockIdxList == NULL) {
|
||||||
|
pTableScanInfo->pBlockIdxList = taosArrayInit(numOfBlocks, sizeof(STableDataBlockIdx));
|
||||||
|
}
|
||||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||||
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
|
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
|
||||||
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
|
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
|
||||||
|
|
||||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||||
|
|
||||||
|
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = i};
|
||||||
|
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
|
||||||
}
|
}
|
||||||
|
taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||||
|
pTableScanInfo->pBlockList = NULL;
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
|
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
|
||||||
|
@ -420,7 +431,13 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
||||||
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
||||||
|
|
||||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||||
|
STableBlockScanInfo *pTableScanInfo = sup.pDataBlockInfo[pos][index].pInfo;
|
||||||
|
if (pTableScanInfo->pBlockIdxList == NULL) {
|
||||||
|
size_t szTableDataBlocks = taosArrayGetSize(pTableScanInfo->pBlockList);
|
||||||
|
pTableScanInfo->pBlockIdxList = taosArrayInit(szTableDataBlocks, sizeof(STableDataBlockIdx));
|
||||||
|
}
|
||||||
|
STableDataBlockIdx tableDataBlockIdx = {.globalIndex = numOfTotal};
|
||||||
|
taosArrayPush(pTableScanInfo->pBlockIdxList, &tableDataBlockIdx);
|
||||||
// set data block index overflow, in order to disable the offset comparator
|
// set data block index overflow, in order to disable the offset comparator
|
||||||
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
|
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
|
||||||
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
|
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
|
||||||
|
@ -430,6 +447,12 @@ int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int3
|
||||||
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
|
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
|
||||||
|
taosArrayDestroy(pTableScanInfo->pBlockList);
|
||||||
|
pTableScanInfo->pBlockList = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
|
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
|
||||||
(et - st) / 1000.0, pReader->idStr);
|
(et - st) / 1000.0, pReader->idStr);
|
||||||
|
|
|
@ -59,11 +59,16 @@ typedef struct {
|
||||||
bool hasVal;
|
bool hasVal;
|
||||||
} SIterInfo;
|
} SIterInfo;
|
||||||
|
|
||||||
|
typedef struct STableDataBlockIdx {
|
||||||
|
int32_t globalIndex;
|
||||||
|
} STableDataBlockIdx;
|
||||||
|
|
||||||
typedef struct STableBlockScanInfo {
|
typedef struct STableBlockScanInfo {
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
TSKEY lastKey;
|
TSKEY lastKey;
|
||||||
TSKEY lastKeyInStt; // last accessed key in stt
|
TSKEY lastKeyInStt; // last accessed key in stt
|
||||||
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
||||||
|
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
|
||||||
SArray* pMemDelData; // SArray<SDelData>
|
SArray* pMemDelData; // SArray<SDelData>
|
||||||
SArray* pFileDelData; // SArray<SDelData> from each file set
|
SArray* pFileDelData; // SArray<SDelData> from each file set
|
||||||
SIterInfo iter; // mem buffer skip list iterator
|
SIterInfo iter; // mem buffer skip list iterator
|
||||||
|
|
Loading…
Reference in New Issue