diff --git a/include/util/tbloomfilter.h b/include/util/tbloomfilter.h index 16f7ff7958..23bf5aaafb 100644 --- a/include/util/tbloomfilter.h +++ b/include/util/tbloomfilter.h @@ -24,6 +24,9 @@ extern "C" { #endif +#define HASH_FUNCTION_1 taosFastHash +#define HASH_FUNCTION_2 taosDJB2Hash + typedef struct SBloomFilter { uint32_t hashFunctions; uint64_t expectedEntries; @@ -37,8 +40,9 @@ typedef struct SBloomFilter { } SBloomFilter; SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate); +int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2); int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len); -int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, uint32_t len); +int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t h1, uint64_t h2); void tBloomFilterDestroy(SBloomFilter *pBF); void tBloomFilterDump(const SBloomFilter *pBF); bool tBloomFilterIsFull(const SBloomFilter *pBF); diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index 9977c1436d..2cf170cf04 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -26,9 +26,12 @@ typedef struct SScalableBf { SArray *bfArray; // array of bloom filters uint32_t growth; uint64_t numBits; + _hash_fn_t hashFn1; + _hash_fn_t hashFn2; } SScalableBf; SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate); +int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len); int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len); int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len); void tScalableBfDestroy(SScalableBf *pSBf); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index f9ab672c4b..a463a17ec8 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -218,17 +218,22 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { } SScalableBf *pSBf = getSBf(pInfo, ts); - // pSBf may be a null pointer - if (pSBf) { - res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); - } int32_t size = taosHashGetSize(pInfo->pMap); if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) { taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY)); + // pSBf may be a null pointer + if (pSBf) { + res = tScalableBfPutNoCheck(pSBf, &updateKey, sizeof(SUpdateKey)); + } return false; } + // pSBf may be a null pointer + if (pSBf) { + res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); + } + if (!pMapMaxTs && maxTs < ts) { taosArraySet(pInfo->pTsBuckets, index, &ts); return false; diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index 84a78f3477..150faab571 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -24,9 +24,8 @@ static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) { uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS; - uint64_t mask = 1ULL << (index % UNIT_NUM_BITS); uint64_t old = buf[unitIndex]; - buf[unitIndex] |= mask; + buf[unitIndex] |= (1ULL << (index % UNIT_NUM_BITS)); return buf[unitIndex] != old; } @@ -57,10 +56,8 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { // ln(2) = 0.693147180559945 pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945); - /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/ - /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/ - pBF->hashFn1 = taosFastHash; - pBF->hashFn2 = taosDJB2Hash; + pBF->hashFn1 = HASH_FUNCTION_1; + pBF->hashFn2 = HASH_FUNCTION_2; pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); if (pBF->buffer == NULL) { tBloomFilterDestroy(pBF); @@ -69,14 +66,29 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { return pBF; } -int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) { +int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) { ASSERT(!tBloomFilterIsFull(pBF)); + bool hasChange = false; + const register uint64_t size = pBF->numBits; + uint64_t cbHash = hash1; + for (uint32_t i = 0; i < pBF->hashFunctions; ++i) { + hasChange |= setBit(pBF->buffer, cbHash % size); + cbHash += hash2; + } + if (hasChange) { + pBF->size++; + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_FAILED; +} + +int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) { uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len); bool hasChange = false; const register uint64_t size = pBF->numBits; uint64_t cbHash = h1; - for (uint64_t i = 0; i < pBF->hashFunctions; ++i) { + for (uint32_t i = 0; i < pBF->hashFunctions; ++i) { hasChange |= setBit(pBF->buffer, cbHash % size); cbHash += h2; } @@ -87,16 +99,14 @@ int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) { return TSDB_CODE_FAILED; } -int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, uint32_t len) { - uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len); - uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len); +int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) { const register uint64_t size = pBF->numBits; - uint64_t cbHash = h1; - for (uint64_t i = 0; i < pBF->hashFunctions; ++i) { + uint64_t cbHash = hash1; + for (uint32_t i = 0; i < pBF->hashFunctions; ++i) { if (!getBit(pBF->buffer, cbHash % size)) { return TSDB_CODE_SUCCESS; } - cbHash += h2; + cbHash += hash2; } return TSDB_CODE_FAILED; } @@ -137,10 +147,8 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) { if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error; } if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error; - /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/ - /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/ - pBF->hashFn1 = taosFastHash; - pBF->hashFn2 = taosDJB2Hash; + pBF->hashFn1 = HASH_FUNCTION_1; + pBF->hashFn2 = HASH_FUNCTION_2; return pBF; _error: diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 797f3a924d..3b4975b701 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -39,13 +39,31 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { return NULL; } pSBf->growth = DEFAULT_GROWTH; + pSBf->hashFn1 = HASH_FUNCTION_1; + pSBf->hashFn2 = HASH_FUNCTION_2; return pSBf; } +int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + int32_t size = taosArrayGetSize(pSBf->bfArray); + SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); + ASSERT(pNormalBf); + if (tBloomFilterIsFull(pNormalBf)) { + pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, + pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); + if (pNormalBf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return tBloomFilterPut(pNormalBf, keyBuf, len); +} + int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); + uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); for (int32_t i = size - 2; i >= 0; --i) { - if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), keyBuf, len) != TSDB_CODE_SUCCESS) { + if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } } @@ -59,13 +77,15 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { return TSDB_CODE_OUT_OF_MEMORY; } } - return tBloomFilterPut(pNormalBf, keyBuf, len); + return tBloomFilterPutHash(pNormalBf, h1, h2); } int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); + uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); int32_t size = taosArrayGetSize(pSBf->bfArray); for (int32_t i = size - 1; i >= 0; --i) { - if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), keyBuf, len) != TSDB_CODE_SUCCESS) { + if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } } @@ -113,6 +133,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { SScalableBf *tScalableBfDecode(SDecoder *pDecoder) { SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); + pSBf->hashFn1 = HASH_FUNCTION_1; + pSBf->hashFn2 = HASH_FUNCTION_2; pSBf->bfArray = NULL; int32_t size = 0; if (tDecodeI32(pDecoder, &size) < 0) goto _error; diff --git a/source/util/test/bloomFilterTest.cpp b/source/util/test/bloomFilterTest.cpp index 2e02129164..c51de3c8a4 100644 --- a/source/util/test/bloomFilterTest.cpp +++ b/source/util/test/bloomFilterTest.cpp @@ -43,12 +43,16 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) { for (int64_t i = 0; i < 1000; i++) { int64_t ts = i + ts1; - GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED); + uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t)); + uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t)); + GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_FAILED); } for (int64_t i = 2000; i < 3000; i++) { int64_t ts = i + ts1; - GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS); + uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t)); + uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t)); + GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_SUCCESS); } tBloomFilterDestroy(pBF1);