commit
2844f4c886
|
@ -26,6 +26,8 @@ typedef struct SScalableBf {
|
||||||
SArray *bfArray; // array of bloom filters
|
SArray *bfArray; // array of bloom filters
|
||||||
uint32_t growth;
|
uint32_t growth;
|
||||||
uint64_t numBits;
|
uint64_t numBits;
|
||||||
|
uint32_t maxBloomFilters;
|
||||||
|
int8_t status;
|
||||||
_hash_fn_t hashFn1;
|
_hash_fn_t hashFn1;
|
||||||
_hash_fn_t hashFn2;
|
_hash_fn_t hashFn2;
|
||||||
} SScalableBf;
|
} SScalableBf;
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
#define DEFAULT_MAP_CAPACITY 131072
|
#define DEFAULT_MAP_CAPACITY 131072
|
||||||
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100)
|
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 100)
|
||||||
#define ROWS_PER_MILLISECOND 1
|
#define ROWS_PER_MILLISECOND 1
|
||||||
#define MAX_NUM_SCALABLE_BF 100000
|
#define MAX_NUM_SCALABLE_BF 64
|
||||||
#define MIN_NUM_SCALABLE_BF 10
|
#define MIN_NUM_SCALABLE_BF 10
|
||||||
#define DEFAULT_PREADD_BUCKET 1
|
#define DEFAULT_PREADD_BUCKET 1
|
||||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||||
|
@ -81,7 +81,9 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
||||||
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
|
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
|
||||||
if (watermark <= adjInterval) {
|
if (watermark <= adjInterval) {
|
||||||
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
|
watermark = TMAX(originInt / adjInterval, 1) * adjInterval;
|
||||||
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
}
|
||||||
|
|
||||||
|
if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
|
||||||
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
|
||||||
}
|
}
|
||||||
return watermark;
|
return watermark;
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
|
|
||||||
#define DEFAULT_GROWTH 2
|
#define DEFAULT_GROWTH 2
|
||||||
#define DEFAULT_TIGHTENING_RATIO 0.5
|
#define DEFAULT_TIGHTENING_RATIO 0.5
|
||||||
|
#define DEFAULT_MAX_BLOOMFILTERS 4
|
||||||
|
#define SBF_INVALID -1
|
||||||
|
#define SBF_VALID 0
|
||||||
|
|
||||||
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate);
|
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate);
|
||||||
|
|
||||||
|
@ -32,6 +35,8 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
|
||||||
if (pSBf == NULL) {
|
if (pSBf == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS;
|
||||||
|
pSBf->status = SBF_VALID;
|
||||||
pSBf->numBits = 0;
|
pSBf->numBits = 0;
|
||||||
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
|
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
|
||||||
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) {
|
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) {
|
||||||
|
@ -45,6 +50,9 @@ SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
|
if (pSBf->status == SBF_INVALID) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
||||||
ASSERT(pNormalBf);
|
ASSERT(pNormalBf);
|
||||||
|
@ -52,6 +60,7 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
|
||||||
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
||||||
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
||||||
if (pNormalBf == NULL) {
|
if (pNormalBf == NULL) {
|
||||||
|
pSBf->status = SBF_INVALID;
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,6 +68,9 @@ int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t le
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
|
if (pSBf->status == SBF_INVALID) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
||||||
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
||||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
|
@ -74,6 +86,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
|
||||||
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
||||||
if (pNormalBf == NULL) {
|
if (pNormalBf == NULL) {
|
||||||
|
pSBf->status = SBF_INVALID;
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -81,6 +94,9 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||||
|
if (pSBf->status == SBF_INVALID) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
|
||||||
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
|
||||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||||
|
@ -93,6 +109,10 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) {
|
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) {
|
||||||
|
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
|
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
|
||||||
if (pNormalBf == NULL) {
|
if (pNormalBf == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -128,6 +148,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
|
||||||
}
|
}
|
||||||
if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1;
|
if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1;
|
||||||
if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1;
|
if (tEncodeU64(pEncoder, pSBf->numBits) < 0) return -1;
|
||||||
|
if (tEncodeU32(pEncoder, pSBf->maxBloomFilters) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pSBf->status) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +172,8 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
|
||||||
}
|
}
|
||||||
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
|
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
|
||||||
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
|
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
|
||||||
|
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error;
|
||||||
|
if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error;
|
||||||
return pSBf;
|
return pSBf;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
Loading…
Reference in New Issue