fix(stream):adjust bloom filter
This commit is contained in:
parent
75db94ceba
commit
42fcb433dc
|
@ -1181,6 +1181,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->win);
|
||||
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
|
||||
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
|
||||
|
||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
|
||||
|
|
|
@ -19,18 +19,24 @@
|
|||
#define DEFAULT_FALSE_POSITIVE 0.01
|
||||
#define DEFAULT_BUCKET_SIZE 1024
|
||||
#define ROWS_PER_MILLISECOND 1
|
||||
#define MAX_NUM_SCALABLE_BF 120
|
||||
#define MAX_NUM_SCALABLE_BF 100000
|
||||
#define MIN_NUM_SCALABLE_BF 10
|
||||
#define DEFAULT_PREADD_BUCKET 1
|
||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
||||
#define DEFAULT_EXPECTED_ENTRIES 10000
|
||||
|
||||
static int64_t adjustExpEntries(int64_t entries) {
|
||||
return TMIN(DEFAULT_EXPECTED_ENTRIES, entries);
|
||||
}
|
||||
|
||||
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
||||
if (pInfo->numSBFs < count) {
|
||||
count = pInfo->numSBFs;
|
||||
}
|
||||
for (uint64_t i = 0; i < count; ++i) {
|
||||
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
|
||||
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
|
||||
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
|
||||
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
||||
}
|
||||
}
|
||||
|
@ -38,9 +44,9 @@ static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
|||
static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
|
||||
if (count < pInfo->numSBFs - 1) {
|
||||
for (uint64_t i = 0; i < count; ++i) {
|
||||
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0);
|
||||
tScalableBfDestroy(pTsSBFs);
|
||||
taosArrayRemove(pInfo->pTsSBFs, i);
|
||||
taosArrayRemove(pInfo->pTsSBFs, 0);
|
||||
}
|
||||
} else {
|
||||
taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy);
|
||||
|
@ -66,7 +72,7 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
|||
return val;
|
||||
}
|
||||
|
||||
static int64_t adjustWatermark(int64_t interval, int32_t watermark) {
|
||||
static int64_t adjustWatermark(int64_t interval, int64_t watermark) {
|
||||
if (watermark <= 0 || watermark > MAX_NUM_SCALABLE_BF * interval) {
|
||||
watermark = MAX_NUM_SCALABLE_BF * interval;
|
||||
} else if (watermark < MIN_NUM_SCALABLE_BF * interval) {
|
||||
|
@ -130,7 +136,8 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
|||
}
|
||||
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
|
||||
if (res == NULL) {
|
||||
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
|
||||
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
|
||||
res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
|
||||
taosArrayPush(pInfo->pTsSBFs, &res);
|
||||
}
|
||||
return res;
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
#include "ttime.h"
|
||||
|
||||
using namespace std;
|
||||
#define MAX_NUM_SCALABLE_BF 100000
|
||||
|
||||
TEST(TD_STREAM_UPDATE_TEST, update) {
|
||||
int64_t interval = 20 * 1000;
|
||||
|
@ -91,11 +92,11 @@ TEST(TD_STREAM_UPDATE_TEST, update) {
|
|||
}
|
||||
|
||||
SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1);
|
||||
GTEST_ASSERT_EQ(pSU4->watermark, 120 * pSU4->interval);
|
||||
GTEST_ASSERT_EQ(pSU4->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
|
||||
GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE);
|
||||
|
||||
SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0);
|
||||
GTEST_ASSERT_EQ(pSU5->watermark, 120 * pSU4->interval);
|
||||
GTEST_ASSERT_EQ(pSU5->watermark, MAX_NUM_SCALABLE_BF * pSU4->interval);
|
||||
GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE);
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue