opt bloom filter

This commit is contained in:
liuyao 2023-10-10 14:28:35 +08:00
parent c5ec45ddb4
commit e9fc079d26
6 changed files with 74 additions and 28 deletions

View File

@ -24,6 +24,9 @@
extern "C" {
#endif
#define HASH_FUNCTION_1 taosFastHash
#define HASH_FUNCTION_2 taosDJB2Hash
typedef struct SBloomFilter {
uint32_t hashFunctions;
uint64_t expectedEntries;
@ -37,8 +40,9 @@ typedef struct SBloomFilter {
} SBloomFilter;
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate);
int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2);
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len);
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, uint32_t len);
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t h1, uint64_t h2);
void tBloomFilterDestroy(SBloomFilter *pBF);
void tBloomFilterDump(const SBloomFilter *pBF);
bool tBloomFilterIsFull(const SBloomFilter *pBF);

View File

@ -26,9 +26,12 @@ typedef struct SScalableBf {
SArray *bfArray; // array of bloom filters
uint32_t growth;
uint64_t numBits;
_hash_fn_t hashFn1;
_hash_fn_t hashFn2;
} SScalableBf;
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate);
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len);
void tScalableBfDestroy(SScalableBf *pSBf);

View File

@ -218,17 +218,22 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
}
SScalableBf *pSBf = getSBf(pInfo, ts);
// pSBf may be a null pointer
if (pSBf) {
res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
}
int32_t size = taosHashGetSize(pInfo->pMap);
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
// pSBf may be a null pointer
if (pSBf) {
res = tScalableBfPutNoCheck(pSBf, &updateKey, sizeof(SUpdateKey));
}
return false;
}
// pSBf may be a null pointer
if (pSBf) {
res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
}
if (!pMapMaxTs && maxTs < ts) {
taosArraySet(pInfo->pTsBuckets, index, &ts);
return false;

View File

@ -24,9 +24,8 @@
static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
uint64_t mask = 1ULL << (index % UNIT_NUM_BITS);
uint64_t old = buf[unitIndex];
buf[unitIndex] |= mask;
buf[unitIndex] |= (1ULL << (index % UNIT_NUM_BITS));
return buf[unitIndex] != old;
}
@ -57,10 +56,8 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
// ln(2) = 0.693147180559945
pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945);
/*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
/*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
pBF->hashFn1 = taosFastHash;
pBF->hashFn2 = taosDJB2Hash;
pBF->hashFn1 = HASH_FUNCTION_1;
pBF->hashFn2 = HASH_FUNCTION_2;
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
if (pBF->buffer == NULL) {
tBloomFilterDestroy(pBF);
@ -69,14 +66,29 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
return pBF;
}
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) {
ASSERT(!tBloomFilterIsFull(pBF));
bool hasChange = false;
const register uint64_t size = pBF->numBits;
uint64_t cbHash = hash1;
for (uint32_t i = 0; i < pBF->hashFunctions; ++i) {
hasChange |= setBit(pBF->buffer, cbHash % size);
cbHash += hash2;
}
if (hasChange) {
pBF->size++;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_FAILED;
}
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
bool hasChange = false;
const register uint64_t size = pBF->numBits;
uint64_t cbHash = h1;
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
for (uint32_t i = 0; i < pBF->hashFunctions; ++i) {
hasChange |= setBit(pBF->buffer, cbHash % size);
cbHash += h2;
}
@ -87,16 +99,14 @@ int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
return TSDB_CODE_FAILED;
}
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) {
const register uint64_t size = pBF->numBits;
uint64_t cbHash = h1;
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
uint64_t cbHash = hash1;
for (uint32_t i = 0; i < pBF->hashFunctions; ++i) {
if (!getBit(pBF->buffer, cbHash % size)) {
return TSDB_CODE_SUCCESS;
}
cbHash += h2;
cbHash += hash2;
}
return TSDB_CODE_FAILED;
}
@ -137,10 +147,8 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) {
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
/*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
/*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
pBF->hashFn1 = taosFastHash;
pBF->hashFn2 = taosDJB2Hash;
pBF->hashFn1 = HASH_FUNCTION_1;
pBF->hashFn2 = HASH_FUNCTION_2;
return pBF;
_error:

View File

@ -39,13 +39,31 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
return NULL;
}
pSBf->growth = DEFAULT_GROWTH;
pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2;
return pSBf;
}
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
int32_t size = taosArrayGetSize(pSBf->bfArray);
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf);
if (tBloomFilterIsFull(pNormalBf)) {
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return tBloomFilterPut(pNormalBf, keyBuf, len);
}
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray);
for (int32_t i = size - 2; i >= 0; --i) {
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), keyBuf, len) != TSDB_CODE_SUCCESS) {
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
}
@ -59,13 +77,15 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return tBloomFilterPut(pNormalBf, keyBuf, len);
return tBloomFilterPutHash(pNormalBf, h1, h2);
}
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray);
for (int32_t i = size - 1; i >= 0; --i) {
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), keyBuf, len) != TSDB_CODE_SUCCESS) {
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED;
}
}
@ -113,6 +133,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2;
pSBf->bfArray = NULL;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) goto _error;

View File

@ -43,12 +43,16 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
for (int64_t i = 0; i < 1000; i++) {
int64_t ts = i + ts1;
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t));
uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t));
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_FAILED);
}
for (int64_t i = 2000; i < 3000; i++) {
int64_t ts = i + ts1;
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t));
uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t));
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_SUCCESS);
}
tBloomFilterDestroy(pBF1);