From 6619175fdec33438d1793288a53c3040db2e4bed Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Feb 2024 09:16:31 +0800 Subject: [PATCH 1/4] opt bloom filter --- include/util/tscalablebf.h | 2 ++ source/libs/stream/src/streamUpdate.c | 6 ++++-- source/util/src/tscalablebf.c | 21 +++++++++++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index 2cf170cf04..d3ce2eb23b 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -26,6 +26,8 @@ typedef struct SScalableBf { SArray *bfArray; // array of bloom filters uint32_t growth; uint64_t numBits; + uint32_t maxBloomFilters; + int8_t status; _hash_fn_t hashFn1; _hash_fn_t hashFn2; } SScalableBf; diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 454ed4297c..34ae909f03 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -22,7 +22,7 @@ #define DEFAULT_MAP_CAPACITY 131072 #define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100) #define ROWS_PER_MILLISECOND 1 -#define MAX_NUM_SCALABLE_BF 100000 +#define MAX_NUM_SCALABLE_BF 32 #define MIN_NUM_SCALABLE_BF 10 #define DEFAULT_PREADD_BUCKET 1 #define MAX_INTERVAL MILLISECOND_PER_MINUTE @@ -81,7 +81,9 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) { if (watermark <= adjInterval) { watermark = TMAX(originInt / adjInterval, 1) * adjInterval; - } else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { + } + + if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) { watermark = MAX_NUM_SCALABLE_BF * adjInterval; } return watermark; diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 3b4975b701..a6e5b563d8 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -20,6 +20,9 @@ #define DEFAULT_GROWTH 2 #define DEFAULT_TIGHTENING_RATIO 0.5 +#define DEFAULT_MAX_BLOOMFILTERS 5 +#define SBF_INVALID -1 +#define SBF_VALID 0 static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate); @@ -41,10 +44,15 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { pSBf->growth = DEFAULT_GROWTH; pSBf->hashFn1 = HASH_FUNCTION_1; pSBf->hashFn2 = HASH_FUNCTION_2; + pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS; + pSBf->status = SBF_VALID; return pSBf; } int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } int32_t size = taosArrayGetSize(pSBf->bfArray); SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); ASSERT(pNormalBf); @@ -52,6 +60,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); if (pNormalBf == NULL) { + pSBf->status = SBF_INVALID; return TSDB_CODE_OUT_OF_MEMORY; } } @@ -59,6 +68,9 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le } int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); @@ -74,6 +86,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); if (pNormalBf == NULL) { + pSBf->status = SBF_INVALID; return TSDB_CODE_OUT_OF_MEMORY; } } @@ -93,6 +106,10 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32 } static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) { + if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) { + return NULL; + } + SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate); if (pNormalBf == NULL) { return NULL; @@ -128,6 +145,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { } if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1; if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1; + if (tEncodeU32(pEncoder, pSBf->maxBloomFilters) < 0) return -1; + if (tEncodeI8(pEncoder, pSBf->status) < 0) return -1; return 0; } @@ -150,6 +169,8 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) { } if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; + if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error; + if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error; return pSBf; _error: From 4c4733209828aba4b1d091048fcc3de6f7ff0263 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Feb 2024 09:24:39 +0800 Subject: [PATCH 2/4] opt bloom filter --- source/libs/stream/src/streamUpdate.c | 2 +- source/util/src/tscalablebf.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 34ae909f03..764bf6e026 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -22,7 +22,7 @@ #define DEFAULT_MAP_CAPACITY 131072 #define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100) #define ROWS_PER_MILLISECOND 1 -#define MAX_NUM_SCALABLE_BF 32 +#define MAX_NUM_SCALABLE_BF 64 #define MIN_NUM_SCALABLE_BF 10 #define DEFAULT_PREADD_BUCKET 1 #define MAX_INTERVAL MILLISECOND_PER_MINUTE diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index a6e5b563d8..5ffa93c6f9 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -20,7 +20,7 @@ #define DEFAULT_GROWTH 2 #define DEFAULT_TIGHTENING_RATIO 0.5 -#define DEFAULT_MAX_BLOOMFILTERS 5 +#define DEFAULT_MAX_BLOOMFILTERS 4 #define SBF_INVALID -1 #define SBF_VALID 0 From 9d4bf1edba56e803b79e2c4da689b72b94859922 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Feb 2024 09:35:51 +0800 Subject: [PATCH 3/4] opt bloom filter --- source/util/src/tscalablebf.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 5ffa93c6f9..743858eee9 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -94,6 +94,9 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { } int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + if (pSBf->status == SBF_INVALID) { + return TSDB_CODE_FAILED; + } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); From 00812c8a5c68a3d92651456290699156f9a851d0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 29 Feb 2024 11:16:54 +0800 Subject: [PATCH 4/4] opt bloom filter --- source/util/src/tscalablebf.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 743858eee9..7af794546b 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -35,6 +35,8 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { if (pSBf == NULL) { return NULL; } + pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS; + pSBf->status = SBF_VALID; pSBf->numBits = 0; pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *)); if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) { @@ -44,8 +46,6 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { pSBf->growth = DEFAULT_GROWTH; pSBf->hashFn1 = HASH_FUNCTION_1; pSBf->hashFn2 = HASH_FUNCTION_2; - pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS; - pSBf->status = SBF_VALID; return pSBf; }