adj bloomfilter

This commit is contained in:
54liuyao 2024-07-12 15:30:03 +08:00
parent de56378b08
commit 5b982354e1
9 changed files with 186 additions and 69 deletions

View File

@ -389,7 +389,7 @@ typedef struct SStateStore {
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count);
void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
int32_t (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);

View File

@ -36,7 +36,7 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count);
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);
int32_t windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
#ifdef __cplusplus

View File

@ -19,6 +19,8 @@
#include "os.h"
#include "tencode.h"
#include "thash.h"
#include "tlog.h"
#include "tutil.h"
#ifdef __cplusplus
extern "C" {
@ -39,15 +41,15 @@ typedef struct SBloomFilter {
double errorRate;
} SBloomFilter;
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate);
int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2);
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len);
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t h1, uint64_t h2);
void tBloomFilterDestroy(SBloomFilter *pBF);
void tBloomFilterDump(const SBloomFilter *pBF);
bool tBloomFilterIsFull(const SBloomFilter *pBF);
int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder *pEncoder);
SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder);
int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilter **ppBF);
int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2);
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len);
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t h1, uint64_t h2);
void tBloomFilterDestroy(SBloomFilter *pBF);
void tBloomFilterDump(const SBloomFilter *pBF);
bool tBloomFilterIsFull(const SBloomFilter *pBF);
int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder *pEncoder);
int32_t tBloomFilterDecode(SDecoder *pDecoder, SBloomFilter **ppBF);
#ifdef __cplusplus
}

View File

@ -32,7 +32,7 @@ typedef struct SScalableBf {
_hash_fn_t hashFn2;
} SScalableBf;
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate);
int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf);
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len);

View File

@ -21,6 +21,7 @@
#include "tdef.h"
#include "thash.h"
#include "tmd5.h"
#include "tutil.h"
#ifdef __cplusplus
extern "C" {

View File

@ -66,15 +66,29 @@ int32_t getValueBuff(TSKEY ts, char* pVal, int32_t len, char* buff) {
return sizeof(TSKEY) + len;
}
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
int32_t windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pInfo->numSBFs < count) {
count = pInfo->numSBFs;
}
for (uint64_t i = 0; i < count; ++i) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
SScalableBf *tsSBF = NULL;
code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &tsSBF);
TSDB_CHECK_CODE(code, lino, _error);
void *res = taosArrayPush(pInfo->pTsSBFs, &tsSBF);
if (!res) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
}
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static void clearItemHelper(void *p) {
@ -183,6 +197,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
}
static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
int32_t code = 0;
if (ts <= 0) {
return NULL;
}
@ -202,7 +217,7 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
if (res == NULL) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &res);
taosArrayPush(pInfo->pTsSBFs, &res);
}
return res;
@ -334,7 +349,11 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo) {
return;
}
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
pInfo->pCloseWinSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
int32_t code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &pInfo->pCloseWinSBF);
if (code != TSDB_CODE_SUCCESS) {
pInfo->pCloseWinSBF = NULL;
uError("%s failed to add close window SBF since %s", __func__, tstrerror(code));
}
}
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {

View File

@ -35,13 +35,17 @@ static FORCE_INLINE bool getBit(uint64_t *buf, uint64_t index) {
return buf[unitIndex] & mask;
}
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilter **ppBF) {
int32_t code = 0;
int32_t lino = 0;
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
if (pBF == NULL) {
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pBF->expectedEntries = expectedEntries;
pBF->errorRate = errorRate;
@ -61,9 +65,16 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
if (pBF->buffer == NULL) {
tBloomFilterDestroy(pBF);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
return pBF;
(*ppBF) = pBF;
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) {
@ -133,27 +144,55 @@ int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder *pEncoder) {
return 0;
}
SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) {
int32_t tBloomFilterDecode(SDecoder *pDecoder, SBloomFilter **ppBF) {
int32_t code = 0;
int32_t lino = 0;
SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
if (!pBF) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pBF->buffer = NULL;
if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numBits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->size) < 0) goto _error;
if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->numBits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->size) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
for (int32_t i = 0; i < pBF->numUnits; i++) {
uint64_t *pUnits = (uint64_t *)pBF->buffer;
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
if (tDecodeU64(pDecoder, pUnits + i) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
pBF->hashFn1 = HASH_FUNCTION_1;
pBF->hashFn2 = HASH_FUNCTION_2;
return pBF;
(*ppBF) = pBF;
_error:
tBloomFilterDestroy(pBF);
return NULL;
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return TSDB_CODE_FAILED;
}
bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; }

View File

@ -17,6 +17,7 @@
#include "tscalablebf.h"
#include "taoserror.h"
#include "tutil.h"
#define DEFAULT_GROWTH 2
#define DEFAULT_TIGHTENING_RATIO 0.5
@ -24,52 +25,82 @@
#define SBF_INVALID -1
#define SBF_VALID 0
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate);
static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, SBloomFilter **ppNormalBf);
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf **ppSBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
const uint32_t defaultSize = 8;
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
if (pSBf == NULL) {
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS;
pSBf->status = SBF_VALID;
pSBf->numBits = 0;
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) {
if (!pSBf->bfArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
SBloomFilter *pNormalBf = NULL;
code = tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
tScalableBfDestroy(pSBf);
return NULL;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->growth = DEFAULT_GROWTH;
pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2;
return pSBf;
(*ppSBf) = pSBf;
return TSDB_CODE_SUCCESS;
_error:
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
int32_t size = taosArrayGetSize(pSBf->bfArray);
int32_t size = taosArrayGetSize(pSBf->bfArray);
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf);
if (!pNormalBf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tBloomFilterIsFull(pNormalBf)) {
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) {
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
}
return tBloomFilterPut(pNormalBf, keyBuf, len);
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
@ -83,14 +114,20 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf);
if (tBloomFilterIsFull(pNormalBf)) {
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) {
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
}
return tBloomFilterPutHash(pNormalBf, h1, h2);
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
@ -108,21 +145,31 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
return TSDB_CODE_SUCCESS;
}
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) {
static int32_t tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate, SBloomFilter **ppNormalBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
if (pNormalBf == NULL) {
return NULL;
}
SBloomFilter *pNormalBf = NULL;
code = tBloomFilterInit(expectedEntries, errorRate, &pNormalBf);
TSDB_CHECK_CODE(code, lino, _error);
if (taosArrayPush(pSBf->bfArray, &pNormalBf) == NULL) {
tBloomFilterDestroy(pNormalBf);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->numBits += pNormalBf->numBits;
return pNormalBf;
(*ppNormalBf) = pNormalBf;
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void tScalableBfDestroy(SScalableBf *pSBf) {
@ -154,6 +201,8 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
}
SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2;
@ -166,8 +215,9 @@ SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
}
pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *));
for (int32_t i = 0; i < size; i++) {
SBloomFilter *pBF = tBloomFilterDecode(pDecoder);
if (!pBF) goto _error;
SBloomFilter *pBF = NULL;
code = tBloomFilterDecode(pDecoder, &pBF);
TSDB_CHECK_CODE(code, lino, _error);
taosArrayPush(pSBf->bfArray, &pBF);
}
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;

View File

@ -8,12 +8,15 @@ using namespace std;
TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
int64_t ts1 = 1650803518000;
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 0));
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 1));
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, -0.1));
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(0, 0.01));
SBloomFilter *pBFTmp = NULL;
GTEST_ASSERT_EQ(0, tBloomFilterInit(100, 0, &pBFTmp));
GTEST_ASSERT_EQ(0, tBloomFilterInit(100, 1, &pBFTmp));
GTEST_ASSERT_EQ(0, tBloomFilterInit(100, -0.1, &pBFTmp));
GTEST_ASSERT_EQ(0, tBloomFilterInit(0, 0.01, &pBFTmp));
SBloomFilter *pBF1 = tBloomFilterInit(100, 0.005);
SBloomFilter *pBF1 = NULL,
int32_t code = tBloomFilterInit(100, 0.005, &pBF1);
GTEST_ASSERT_EQ(0,code);
GTEST_ASSERT_EQ(pBF1->numBits, 1152);
GTEST_ASSERT_EQ(pBF1->numUnits, 1152 / 64);
int64_t count = 0;
@ -25,16 +28,19 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
}
ASSERT_TRUE(tBloomFilterIsFull(pBF1));
SBloomFilter *pBF2 = tBloomFilterInit(1000 * 10000, 0.1);
SBloomFilter *pBF2 = NULL;
GTEST_ASSERT_EQ(0, tBloomFilterInit(1000 * 10000, 0.1, &pBF2));
GTEST_ASSERT_EQ(pBF2->numBits, 47925312);
GTEST_ASSERT_EQ(pBF2->numUnits, 47925312 / 64);
SBloomFilter *pBF3 = tBloomFilterInit(10000 * 10000, 0.001);
SBloomFilter *pBF3 = NULL;
GTEST_ASSERT_EQ(0, tBloomFilterInit(10000 * 10000, 0.001));
GTEST_ASSERT_EQ(pBF3->numBits, 1437758784);
GTEST_ASSERT_EQ(pBF3->numUnits, 1437758784 / 64);
int64_t size = 10000;
SBloomFilter *pBF4 = tBloomFilterInit(size, 0.001);
SBloomFilter *pBF4 = NULL;
GTEST_ASSERT_EQ(0, tBloomFilterInit(size, 0.001));
for (int64_t i = 0; i < 1000; i++) {
int64_t ts = i + ts1;
GTEST_ASSERT_EQ(tBloomFilterPut(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);