fix(stream): enable filter table based on table statistics.
This commit is contained in:
parent
53aefa531f
commit
2c357f1958
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbFSet2.h"
|
||||
#include "tsdbMerge.h"
|
||||
#include "tsdbReadUtil.h"
|
||||
#include "tsdbSttFileRW.h"
|
||||
|
||||
|
@ -352,10 +353,14 @@ static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t uidComparFn(const void *p1, const void *p2) {
|
||||
const uint64_t *uid1 = p1;
|
||||
static int32_t suidComparFn(const void *target, const void *p2) {
|
||||
const uint64_t *targetUid = target;
|
||||
const uint64_t *uid2 = p2;
|
||||
return (*uid1) - (*uid2);
|
||||
if (*uid2 == (*targetUid)) {
|
||||
return 0;
|
||||
} else {
|
||||
return (*targetUid) < (*uid2) ? -1:1;
|
||||
}
|
||||
}
|
||||
|
||||
static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
|
||||
|
@ -372,17 +377,6 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
|||
}
|
||||
}
|
||||
|
||||
// for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
|
||||
// SStatisBlk *p = &pStatisBlkArray->data[i];
|
||||
// if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// if (p->maxTbid.uid < uid) {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
|
||||
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -391,10 +385,39 @@ static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint6
|
|||
STbStatisBlock block = {0};
|
||||
tsdbSttFileReadStatisBlock(pReader, p, &block);
|
||||
|
||||
int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ);
|
||||
tStatisBlockDestroy(&block);
|
||||
int32_t index = tarray2SearchIdx(block.suid, &suid, sizeof(int64_t), suidComparFn, TD_EQ);
|
||||
if (index == -1) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t j = index;
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return true;
|
||||
} else if (block.uid->data[j] > uid) {
|
||||
while (j >= 0 && block.suid->data[j] == suid) {
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return true;
|
||||
} else {
|
||||
j -= 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
j = index + 1;
|
||||
while (j < block.suid->size && block.suid->data[j] == suid) {
|
||||
if (block.uid->data[j] == uid) {
|
||||
tStatisBlockDestroy(&block);
|
||||
return true;
|
||||
} else {
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (index != -1);
|
||||
tStatisBlockDestroy(&block);
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
|
||||
|
@ -445,12 +468,12 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
|
|||
tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
|
||||
}
|
||||
|
||||
// bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||
// if (!exists) {
|
||||
// pIter->iSttBlk = -1;
|
||||
// pIter->pSttBlk = NULL;
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
|
||||
if (!exists) {
|
||||
pIter->iSttBlk = -1;
|
||||
pIter->pSttBlk = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// find the start block, actually we could load the position to avoid repeatly searching for the start position when
|
||||
// the skey is updated.
|
||||
|
|
Loading…
Reference in New Issue