From 3b914248a52a7a0b2c862a0f2464374ef7b9b789 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 12 Jul 2024 15:52:54 +0800 Subject: [PATCH] adj bloomfilter --- include/util/tscalablebf.h | 14 ++--- source/libs/stream/src/streamUpdate.c | 19 +++++-- source/util/src/tscalablebf.c | 75 +++++++++++++++++++-------- 3 files changed, 74 insertions(+), 34 deletions(-) diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index 5e1efb6b25..584e023bb0 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -32,13 +32,13 @@ typedef struct SScalableBf { _hash_fn_t hashFn2; } SScalableBf; -int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf); -int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len); -int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len); -int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len); -void tScalableBfDestroy(SScalableBf *pSBf); -int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder); -SScalableBf *tScalableBfDecode(SDecoder *pDecoder); +int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf); +int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len); +int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len); +int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len); +void tScalableBfDestroy(SScalableBf *pSBf); +int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder); +int32_t tScalableBfDecode(SDecoder *pDecoder, SScalableBf **ppSBf); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index e41c5cfffc..2da20cadae 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -420,6 +420,8 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) } int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; ASSERT(pInfo); SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -440,8 +442,10 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *)); for (int32_t i = 0; i < sBfSize; i++) { - SScalableBf *pSBf = tScalableBfDecode(&decoder); - if (!pSBf) return -1; + SScalableBf *pSBf = NULL; + code = tScalableBfDecode(&decoder, &pSBf); + TSDB_CHECK_CODE(code, lino, _error); + taosArrayPush(pInfo->pTsSBFs, &pSBf); } @@ -449,7 +453,9 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1; - pInfo->pCloseWinSBF = tScalableBfDecode(&decoder); + pInfo->pCloseWinSBF = NULL; + code = tScalableBfDecode(&decoder, &pInfo->pCloseWinSBF); + TSDB_CHECK_CODE(code, lino, _error); int32_t mapSize = 0; if (tDecodeI32(&decoder, &mapSize) < 0) return -1; @@ -482,7 +488,12 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { tEndDecode(&decoder); tDecoderClear(&decoder); - return 0; + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index b996cf21bb..1ee99b07f7 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -25,11 +25,12 @@ #define SBF_INVALID -1 #define SBF_VALID 0 -static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, SBloomFilter **ppNormalBf); +static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, + SBloomFilter **ppNormalBf); int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; const uint32_t defaultSize = 8; if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) { code = TSDB_CODE_FAILED; @@ -47,7 +48,7 @@ int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _error); } - + SBloomFilter *pNormalBf = NULL; code = tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); if (code != TSDB_CODE_SUCCESS) { @@ -104,7 +105,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); - int32_t size = taosArrayGetSize(pSBf->bfArray); + int32_t size = taosArrayGetSize(pSBf->bfArray); for (int32_t i = size - 2; i >= 0; --i) { if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; @@ -115,7 +116,7 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { ASSERT(pNormalBf); if (tBloomFilterIsFull(pNormalBf)) { code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, - pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); + pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); if (code != TSDB_CODE_SUCCESS) { pSBf->status = SBF_INVALID; TSDB_CHECK_CODE(code, lino, _error); @@ -136,7 +137,7 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32 } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); - int32_t size = taosArrayGetSize(pSBf->bfArray); + int32_t size = taosArrayGetSize(pSBf->bfArray); for (int32_t i = size - 1; i >= 0; --i) { if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; @@ -145,7 +146,8 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32 return TSDB_CODE_SUCCESS; } -static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, SBloomFilter **ppNormalBf) { +static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, + SBloomFilter **ppNormalBf) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) { @@ -163,7 +165,7 @@ static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, TSDB_CHECK_CODE(code, lino, _error); } pSBf->numBits += pNormalBf->numBits; - (*ppNormalBf) = pNormalBf; + (*ppNormalBf) = pNormalBf; _error: if (code != TSDB_CODE_SUCCESS) { @@ -200,33 +202,60 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { return 0; } -SScalableBf *tScalableBfDecode(SDecoder *pDecoder) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t lino = 0; +int32_t tScalableBfDecode(SDecoder *pDecoder, SScalableBf **ppSBf) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); + if (!pSBf) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); + } pSBf->hashFn1 = HASH_FUNCTION_1; pSBf->hashFn2 = HASH_FUNCTION_2; pSBf->bfArray = NULL; int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) goto _error; - if (size == 0) { - tScalableBfDestroy(pSBf); - return NULL; + if (tDecodeI32(pDecoder, &size) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); } - pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *)); + if (size == 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + pSBf->bfArray = taosArrayInit(size * 2, POINTER_BYTES); + if (!pSBf->bfArray) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); + } + for (int32_t i = 0; i < size; i++) { SBloomFilter *pBF = NULL; code = tBloomFilterDecode(pDecoder, &pBF); TSDB_CHECK_CODE(code, lino, _error); taosArrayPush(pSBf->bfArray, &pBF); } - if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; - if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; - if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error; - if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error; - return pSBf; + if (tDecodeU32(pDecoder, &pSBf->growth) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeI8(pDecoder, &pSBf->status) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + (*ppSBf) = pSBf; _error: tScalableBfDestroy(pSBf); - return NULL; + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; }