adj bloomfilter

This commit is contained in:
54liuyao 2024-07-12 15:52:54 +08:00
parent 5b982354e1
commit 3b914248a5
3 changed files with 74 additions and 34 deletions

View File

@ -32,13 +32,13 @@ typedef struct SScalableBf {
_hash_fn_t hashFn2; _hash_fn_t hashFn2;
} SScalableBf; } SScalableBf;
int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf); int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf);
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len); int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len); int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len); int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len);
void tScalableBfDestroy(SScalableBf *pSBf); void tScalableBfDestroy(SScalableBf *pSBf);
int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder); int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder);
SScalableBf *tScalableBfDecode(SDecoder *pDecoder); int32_t tScalableBfDecode(SDecoder *pDecoder, SScalableBf **ppSBf);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -420,6 +420,8 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
} }
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
ASSERT(pInfo); ASSERT(pInfo);
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
@ -440,8 +442,10 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *)); pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *));
for (int32_t i = 0; i < sBfSize; i++) { for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf *pSBf = tScalableBfDecode(&decoder); SScalableBf *pSBf = NULL;
if (!pSBf) return -1; code = tScalableBfDecode(&decoder, &pSBf);
TSDB_CHECK_CODE(code, lino, _error);
taosArrayPush(pInfo->pTsSBFs, &pSBf); taosArrayPush(pInfo->pTsSBFs, &pSBf);
} }
@ -449,7 +453,9 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1;
pInfo->pCloseWinSBF = tScalableBfDecode(&decoder); pInfo->pCloseWinSBF = NULL;
code = tScalableBfDecode(&decoder, &pInfo->pCloseWinSBF);
TSDB_CHECK_CODE(code, lino, _error);
int32_t mapSize = 0; int32_t mapSize = 0;
if (tDecodeI32(&decoder, &mapSize) < 0) return -1; if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
@ -482,7 +488,12 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0;
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
} }
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {

View File

@ -25,11 +25,12 @@
#define SBF_INVALID -1 #define SBF_INVALID -1
#define SBF_VALID 0 #define SBF_VALID 0
static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, SBloomFilter **ppNormalBf); static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate,
SBloomFilter **ppNormalBf);
int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf) { int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
const uint32_t defaultSize = 8; const uint32_t defaultSize = 8;
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) { if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
@ -104,7 +105,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
} }
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray); int32_t size = taosArrayGetSize(pSBf->bfArray);
for (int32_t i = size - 2; i >= 0; --i) { for (int32_t i = size - 2; i >= 0; --i) {
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -115,7 +116,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
ASSERT(pNormalBf); ASSERT(pNormalBf);
if (tBloomFilterIsFull(pNormalBf)) { if (tBloomFilterIsFull(pNormalBf)) {
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID; pSBf->status = SBF_INVALID;
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);
@ -136,7 +137,7 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
} }
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
int32_t size = taosArrayGetSize(pSBf->bfArray); int32_t size = taosArrayGetSize(pSBf->bfArray);
for (int32_t i = size - 1; i >= 0; --i) { for (int32_t i = size - 1; i >= 0; --i) {
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -145,7 +146,8 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, SBloomFilter **ppNormalBf) { static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate,
SBloomFilter **ppNormalBf) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) { if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
@ -163,7 +165,7 @@ static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries,
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);
} }
pSBf->numBits += pNormalBf->numBits; pSBf->numBits += pNormalBf->numBits;
(*ppNormalBf) = pNormalBf; (*ppNormalBf) = pNormalBf;
_error: _error:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -200,33 +202,60 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
return 0; return 0;
} }
SScalableBf *tScalableBfDecode(SDecoder *pDecoder) { int32_t tScalableBfDecode(SDecoder *pDecoder, SScalableBf **ppSBf) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
if (!pSBf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->hashFn1 = HASH_FUNCTION_1; pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2; pSBf->hashFn2 = HASH_FUNCTION_2;
pSBf->bfArray = NULL; pSBf->bfArray = NULL;
int32_t size = 0; int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) goto _error; if (tDecodeI32(pDecoder, &size) < 0) {
if (size == 0) { code = TSDB_CODE_FAILED;
tScalableBfDestroy(pSBf); TSDB_CHECK_CODE(code, lino, _error);
return NULL;
} }
pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *)); if (size == 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->bfArray = taosArrayInit(size * 2, POINTER_BYTES);
if (!pSBf->bfArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
SBloomFilter *pBF = NULL; SBloomFilter *pBF = NULL;
code = tBloomFilterDecode(pDecoder, &pBF); code = tBloomFilterDecode(pDecoder, &pBF);
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);
taosArrayPush(pSBf->bfArray, &pBF); taosArrayPush(pSBf->bfArray, &pBF);
} }
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; if (tDecodeU32(pDecoder, &pSBf->growth) < 0) {
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; code = TSDB_CODE_FAILED;
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error; TSDB_CHECK_CODE(code, lino, _error);
if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error; }
return pSBf; if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeI8(pDecoder, &pSBf->status) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
(*ppSBf) = pSBf;
_error: _error:
tScalableBfDestroy(pSBf); tScalableBfDestroy(pSBf);
return NULL; if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
} }