diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index 2cf170cf04..d3ce2eb23b 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -26,6 +26,8 @@ typedef struct SScalableBf { SArray *bfArray; // array of bloom filters uint32_t growth; uint64_t numBits; + uint32_t maxBloomFilters; + int8_t status; _hash_fn_t hashFn1; _hash_fn_t hashFn2; } SScalableBf; diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 454ed4297c..764bf6e026 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -22,7 +22,7 @@ #define DEFAULT_MAP_CAPACITY 131072 #define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100) #define ROWS_PER_MILLISECOND 1 -#define MAX_NUM_SCALABLE_BF 100000 +#define MAX_NUM_SCALABLE_BF 64 #define MIN_NUM_SCALABLE_BF 10 #define DEFAULT_PREADD_BUCKET 1 #define MAX_INTERVAL MILLISECOND_PER_MINUTE @@ -81,7 +81,9 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { if (watermark <= adjInterval) { watermark = TMAX(originInt / adjInterval, 1) * adjInterval; - } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { + } + + if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { watermark = MAX_NUM_SCALABLE_BF * adjInterval; } return watermark; diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 3b4975b701..7af794546b 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -20,6 +20,9 @@ #define DEFAULT_GROWTH 2 #define DEFAULT_TIGHTENING_RATIO 0.5 +#define DEFAULT_MAX_BLOOMFILTERS 4 +#define SBF_INVALID -1 +#define SBF_VALID 0 static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate); @@ -32,6 +35,8 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { if (pSBf == NULL) { return NULL; } + pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS; + pSBf->status = SBF_VALID; pSBf->numBits = 0; pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *)); if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) { @@ -45,6 +50,9 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { } int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } int32_t size = taosArrayGetSize(pSBf->bfArray); SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); ASSERT(pNormalBf); @@ -52,6 +60,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); if (pNormalBf == NULL) { + pSBf->status = SBF_INVALID; return TSDB_CODE_OUT_OF_MEMORY; } } @@ -59,6 +68,9 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le } int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); @@ -74,6 +86,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); if (pNormalBf == NULL) { + pSBf->status = SBF_INVALID; return TSDB_CODE_OUT_OF_MEMORY; } } @@ -81,6 +94,9 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { } int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); @@ -93,6 +109,10 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32 } static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) { + if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) { + return NULL; + } + SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate); if (pNormalBf == NULL) { return NULL; @@ -128,6 +148,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { } if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1; if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1; + if (tEncodeU32(pEncoder, pSBf->maxBloomFilters) < 0) return -1; + if (tEncodeI8(pEncoder, pSBf->status) < 0) return -1; return 0; } @@ -150,6 +172,8 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) { } if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; + if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error; + if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error; return pSBf; _error: