commit
614bdd9fb9
|
@ -24,6 +24,9 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define HASH_FUNCTION_1 taosFastHash
|
||||||
|
#define HASH_FUNCTION_2 taosDJB2Hash
|
||||||
|
|
||||||
typedef struct SBloomFilter {
|
typedef struct SBloomFilter {
|
||||||
uint32_t hashFunctions;
|
uint32_t hashFunctions;
|
||||||
uint64_t expectedEntries;
|
uint64_t expectedEntries;
|
||||||
|
@ -37,8 +40,9 @@ typedef struct SBloomFilter {
|
||||||
} SBloomFilter;
|
} SBloomFilter;
|
||||||
|
|
||||||
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate);
|
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate);
|
||||||
|
int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2);
|
||||||
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len);
|
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len);
|
||||||
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, uint32_t len);
|
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t h1, uint64_t h2);
|
||||||
void tBloomFilterDestroy(SBloomFilter *pBF);
|
void tBloomFilterDestroy(SBloomFilter *pBF);
|
||||||
void tBloomFilterDump(const SBloomFilter *pBF);
|
void tBloomFilterDump(const SBloomFilter *pBF);
|
||||||
bool tBloomFilterIsFull(const SBloomFilter *pBF);
|
bool tBloomFilterIsFull(const SBloomFilter *pBF);
|
||||||
|
|
|
@ -26,9 +26,12 @@ typedef struct SScalableBf {
|
||||||
SArray *bfArray; // array of bloom filters
|
SArray *bfArray; // array of bloom filters
|
||||||
uint32_t growth;
|
uint32_t growth;
|
||||||
uint64_t numBits;
|
uint64_t numBits;
|
||||||
|
_hash_fn_t hashFn1;
|
||||||
|
_hash_fn_t hashFn2;
|
||||||
} SScalableBf;
|
} SScalableBf;
|
||||||
|
|
||||||
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate);
|
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate);
|
||||||
|
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
|
||||||
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
|
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
|
||||||
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len);
|
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len);
|
||||||
void tScalableBfDestroy(SScalableBf *pSBf);
|
void tScalableBfDestroy(SScalableBf *pSBf);
|
||||||
|
|
|
@ -219,17 +219,22 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SScalableBf *pSBf = getSBf(pInfo, ts);
|
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||||
// pSBf may be a null pointer
|
|
||||||
if (pSBf) {
|
|
||||||
res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t size = taosHashGetSize(pInfo->pMap);
|
int32_t size = taosHashGetSize(pInfo->pMap);
|
||||||
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
|
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
|
||||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||||
|
// pSBf may be a null pointer
|
||||||
|
if (pSBf) {
|
||||||
|
res = tScalableBfPutNoCheck(pSBf, &updateKey, sizeof(SUpdateKey));
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pSBf may be a null pointer
|
||||||
|
if (pSBf) {
|
||||||
|
res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
|
||||||
|
}
|
||||||
|
|
||||||
if (!pMapMaxTs && maxTs < ts) {
|
if (!pMapMaxTs && maxTs < ts) {
|
||||||
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -24,9 +24,8 @@
|
||||||
|
|
||||||
static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
|
static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
|
||||||
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
|
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
|
||||||
uint64_t mask = 1ULL << (index % UNIT_NUM_BITS);
|
|
||||||
uint64_t old = buf[unitIndex];
|
uint64_t old = buf[unitIndex];
|
||||||
buf[unitIndex] |= mask;
|
buf[unitIndex] |= (1ULL << (index % UNIT_NUM_BITS));
|
||||||
return buf[unitIndex] != old;
|
return buf[unitIndex] != old;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,10 +56,8 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
|
||||||
|
|
||||||
// ln(2) = 0.693147180559945
|
// ln(2) = 0.693147180559945
|
||||||
pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945);
|
pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945);
|
||||||
/*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
|
pBF->hashFn1 = HASH_FUNCTION_1;
|
||||||
/*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
|
pBF->hashFn2 = HASH_FUNCTION_2;
|
||||||
pBF->hashFn1 = taosFastHash;
|
|
||||||
pBF->hashFn2 = taosDJB2Hash;
|
|
||||||
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
||||||
if (pBF->buffer == NULL) {
|
if (pBF->buffer == NULL) {
|
||||||
tBloomFilterDestroy(pBF);
|
tBloomFilterDestroy(pBF);
|
||||||
|
@ -69,14 +66,29 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
|
||||||
return pBF;
|
return pBF;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
|
int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) {
|
||||||
ASSERT(!tBloomFilterIsFull(pBF));
|
ASSERT(!tBloomFilterIsFull(pBF));
|
||||||
|
bool hasChange = false;
|
||||||
|
const register uint64_t size = pBF->numBits;
|
||||||
|
uint64_t cbHash = hash1;
|
||||||
|
for (uint32_t i = 0; i < pBF->hashFunctions; ++i) {
|
||||||
|
hasChange |= setBit(pBF->buffer, cbHash % size);
|
||||||
|
cbHash += hash2;
|
||||||
|
}
|
||||||
|
if (hasChange) {
|
||||||
|
pBF->size++;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
|
||||||
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
|
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
|
||||||
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
|
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
|
||||||
bool hasChange = false;
|
bool hasChange = false;
|
||||||
const register uint64_t size = pBF->numBits;
|
const register uint64_t size = pBF->numBits;
|
||||||
uint64_t cbHash = h1;
|
uint64_t cbHash = h1;
|
||||||
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
|
for (uint32_t i = 0; i < pBF->hashFunctions; ++i) {
|
||||||
hasChange |= setBit(pBF->buffer, cbHash % size);
|
hasChange |= setBit(pBF->buffer, cbHash % size);
|
||||||
cbHash += h2;
|
cbHash += h2;
|
||||||
}
|
}
|
||||||
|
@ -87,16 +99,14 @@ int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
|
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) {
|
||||||
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
|
|
||||||
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
|
|
||||||
const register uint64_t size = pBF->numBits;
|
const register uint64_t size = pBF->numBits;
|
||||||
uint64_t cbHash = h1;
|
uint64_t cbHash = hash1;
|
||||||
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
|
for (uint32_t i = 0; i < pBF->hashFunctions; ++i) {
|
||||||
if (!getBit(pBF->buffer, cbHash % size)) {
|
if (!getBit(pBF->buffer, cbHash % size)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
cbHash += h2;
|
cbHash += hash2;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -137,10 +147,8 @@ SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) {
|
||||||
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
|
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
|
||||||
}
|
}
|
||||||
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
|
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
|
||||||
/*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/
|
pBF->hashFn1 = HASH_FUNCTION_1;
|
||||||
/*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/
|
pBF->hashFn2 = HASH_FUNCTION_2;
|
||||||
pBF->hashFn1 = taosFastHash;
|
|
||||||
pBF->hashFn2 = taosDJB2Hash;
|
|
||||||
return pBF;
|
return pBF;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -39,13 +39,31 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pSBf->growth = DEFAULT_GROWTH;
|
pSBf->growth = DEFAULT_GROWTH;
|
||||||
|
pSBf->hashFn1 = HASH_FUNCTION_1;
|
||||||
|
pSBf->hashFn2 = HASH_FUNCTION_2;
|
||||||
return pSBf;
|
return pSBf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
|
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
||||||
|
ASSERT(pNormalBf);
|
||||||
|
if (tBloomFilterIsFull(pNormalBf)) {
|
||||||
|
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
||||||
|
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
||||||
|
if (pNormalBf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tBloomFilterPut(pNormalBf, keyBuf, len);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
|
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
||||||
|
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
||||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
for (int32_t i = size - 2; i >= 0; --i) {
|
for (int32_t i = size - 2; i >= 0; --i) {
|
||||||
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), keyBuf, len) != TSDB_CODE_SUCCESS) {
|
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,13 +77,15 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return tBloomFilterPut(pNormalBf, keyBuf, len);
|
return tBloomFilterPutHash(pNormalBf, h1, h2);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
|
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
||||||
|
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
||||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
for (int32_t i = size - 1; i >= 0; --i) {
|
for (int32_t i = size - 1; i >= 0; --i) {
|
||||||
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), keyBuf, len) != TSDB_CODE_SUCCESS) {
|
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -113,6 +133,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
|
||||||
|
|
||||||
SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
|
SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
|
||||||
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
|
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
|
||||||
|
pSBf->hashFn1 = HASH_FUNCTION_1;
|
||||||
|
pSBf->hashFn2 = HASH_FUNCTION_2;
|
||||||
pSBf->bfArray = NULL;
|
pSBf->bfArray = NULL;
|
||||||
int32_t size = 0;
|
int32_t size = 0;
|
||||||
if (tDecodeI32(pDecoder, &size) < 0) goto _error;
|
if (tDecodeI32(pDecoder, &size) < 0) goto _error;
|
||||||
|
|
|
@ -43,12 +43,16 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
|
||||||
|
|
||||||
for (int64_t i = 0; i < 1000; i++) {
|
for (int64_t i = 0; i < 1000; i++) {
|
||||||
int64_t ts = i + ts1;
|
int64_t ts = i + ts1;
|
||||||
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
|
uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t));
|
||||||
|
uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t));
|
||||||
|
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int64_t i = 2000; i < 3000; i++) {
|
for (int64_t i = 2000; i < 3000; i++) {
|
||||||
int64_t ts = i + ts1;
|
int64_t ts = i + ts1;
|
||||||
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t));
|
||||||
|
uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t));
|
||||||
|
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
tBloomFilterDestroy(pBF1);
|
tBloomFilterDestroy(pBF1);
|
||||||
|
|
|
@ -57,6 +57,8 @@ system sh/stop_dnodes.sh
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s start
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
sleep 2000
|
||||||
|
|
||||||
sql insert into t1 values(1648791213002,3,2,3,1.1);
|
sql insert into t1 values(1648791213002,3,2,3,1.1);
|
||||||
sql insert into t2 values(1648791233003,4,2,3,1.1);
|
sql insert into t2 values(1648791233003,4,2,3,1.1);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue