diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h new file mode 100644 index 0000000000..5911759fe3 --- /dev/null +++ b/include/libs/stream/tstreamUpdate.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TSTREAMUPDATE_H_ +#define _TSTREAMUPDATE_H_ + +#include "taosdef.h" +#include "tarray.h" +#include "tmsg.h" +#include "tscalablebf.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SUpdateInfo { + SArray *pTsBuckets; + uint64_t numBuckets; + SArray *pTsSBFs; + uint64_t numSBFs; + int64_t interval; + int64_t watermark; + TSKEY minTS; +} SUpdateInfo; + +SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); +SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); +bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts); +void updateInfoDestroy(SUpdateInfo *pInfo); + +#ifdef __cplusplus +} +#endif + +#endif /* ifndef _TSTREAMUPDATE_H_ */ \ No newline at end of file diff --git a/include/util/tarray.h b/include/util/tarray.h index bbde90f28f..482f13de39 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -218,6 +218,13 @@ void taosArrayClear(SArray* pArray); */ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); +/** + * clear the array (remove all element) + * @param pArray + * @param fp + */ +void taosArrayClearP(SArray* pArray, FDelete fp); + void* taosArrayDestroy(SArray* pArray); void taosArrayDestroyP(SArray* pArray, FDelete fp); void taosArrayDestroyEx(SArray* pArray, FDelete fp); diff --git a/include/util/tbloomfilter.h b/include/util/tbloomfilter.h new file mode 100644 index 0000000000..b168da594a --- /dev/null +++ b/include/util/tbloomfilter.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_BLOOMFILTER_H_ +#define _TD_UTIL_BLOOMFILTER_H_ + +#include "os.h" +#include "thash.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SBloomFilter { + uint32_t hashFunctions; + uint64_t expectedEntries; + uint64_t numUnits; + uint64_t numBits; + uint64_t size; + _hash_fn_t hashFn1; + _hash_fn_t hashFn2; + void *buffer; + double errorRate; +} SBloomFilter; + +SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate); +int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len); +int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, + uint32_t len); +void tBloomFilterDestroy(SBloomFilter *pBF); +void tBloomFilterDump(const SBloomFilter *pBF); +bool tBloomFilterIsFull(const SBloomFilter *pBF); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_BLOOMFILTER_H_*/ \ No newline at end of file diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h new file mode 100644 index 0000000000..8f88f65048 --- /dev/null +++ b/include/util/tscalablebf.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_SCALABLEBF_H_ +#define _TD_UTIL_SCALABLEBF_H_ + +#include "tbloomfilter.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SScalableBf { + SArray *bfArray; // array of bloom filters + uint32_t growth; + uint64_t numBits; +} SScalableBf; + +SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate); +int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len); +int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, + uint32_t len); +void tScalableBfDestroy(SScalableBf *pSBf); +void tScalableBfDump(const SScalableBf *pSBf); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_SCALABLEBF_H_*/ \ No newline at end of file diff --git a/source/libs/stream/src/tstreamUpdate.c b/source/libs/stream/src/tstreamUpdate.c new file mode 100644 index 0000000000..a207ec9265 --- /dev/null +++ b/source/libs/stream/src/tstreamUpdate.c @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstreamUpdate.h" +#include "ttime.h" + +#define DEFAULT_FALSE_POSITIVE 0.01 +#define DEFAULT_BUCKET_SIZE 1024 +#define ROWS_PER_MILLISECOND 1 +#define MAX_NUM_SCALABLE_BF 120 +#define MIN_NUM_SCALABLE_BF 10 +#define DEFAULT_PREADD_BUCKET 1 +#define MAX_INTERVAL MILLISECOND_PER_MINUTE +#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10) + +static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { + if (pInfo->numSBFs < count ) { + count = pInfo->numSBFs; + } + for (uint64_t i = 0; i < count; ++i) { + SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, + DEFAULT_FALSE_POSITIVE); + taosArrayPush(pInfo->pTsSBFs, &tsSBF); + } +} + +static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { + if (count < pInfo->numSBFs - 1) { + for (uint64_t i = 0; i < count; ++i) { + SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, i); + tScalableBfDestroy(pTsSBFs); + taosArrayRemove(pInfo->pTsSBFs, i); + } + } else { + taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy); + } + pInfo->minTS += pInfo->interval * count; +} + +static int64_t adjustInterval(int64_t interval, int32_t precision) { + int64_t val = interval; + if (precision != TSDB_TIME_PRECISION_MILLI) { + val = convertTimePrecision(interval, precision, TSDB_TIME_PRECISION_MILLI); + } + if (val < MIN_INTERVAL) { + val = MIN_INTERVAL; + } else if (val > MAX_INTERVAL) { + val = MAX_INTERVAL; + } + val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision); + return val; +} + +SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark) { + return updateInfoInit(pInterval->interval, pInterval->precision, watermark); +} + +SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark) { + SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); + if (pInfo == NULL) { + return NULL; + } + pInfo->pTsBuckets = NULL; + pInfo->pTsSBFs = NULL; + pInfo->minTS = -1; + pInfo->interval = adjustInterval(interval, precision); + pInfo->watermark = watermark; + + uint64_t bfSize = (uint64_t)(watermark / pInfo->interval); + if (bfSize < MIN_NUM_SCALABLE_BF) { + bfSize = MIN_NUM_SCALABLE_BF; + } else if (bfSize > MAX_NUM_SCALABLE_BF) { + bfSize = MAX_NUM_SCALABLE_BF; + } + + pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(SScalableBf)); + if (pInfo->pTsSBFs == NULL) { + updateInfoDestroy(pInfo); + return NULL; + } + pInfo->numSBFs = bfSize; + windowSBfAdd(pInfo, bfSize); + + pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY)); + if (pInfo->pTsBuckets == NULL) { + updateInfoDestroy(pInfo); + return NULL; + } + + TSKEY dumy = 0; + for(uint64_t i=0; i < DEFAULT_BUCKET_SIZE; ++i) { + taosArrayPush(pInfo->pTsBuckets, &dumy); + } + pInfo->numBuckets = DEFAULT_BUCKET_SIZE; + return pInfo; +} + +static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) { + if (ts <= 0) { + return NULL; + } + if (pInfo->minTS < 0) { + pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval); + } + uint64_t index = (uint64_t)((ts - pInfo->minTS) / pInfo->interval); + if (index >= pInfo->numSBFs) { + uint64_t count = index + 1 - pInfo->numSBFs; + windowSBfDelete(pInfo, count); + windowSBfAdd(pInfo, count); + index = pInfo->numSBFs - 1; + } + SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index); + if (res == NULL) { + res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, + DEFAULT_FALSE_POSITIVE); + taosArrayPush(pInfo->pTsSBFs, &res); + } + return res; +} + +bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) { + int32_t res = TSDB_CODE_FAILED; + uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; + SScalableBf* pSBf = getSBf(pInfo, ts); + // pSBf may be a null pointer + if (pSBf) { + res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); + } + + TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); + if (maxTs < ts ) { + taosArraySet(pInfo->pTsBuckets, index, &ts); + return false; + } + + if (ts < pInfo->minTS) { + return true; + } else if (res == TSDB_CODE_SUCCESS) { + return false; + } + + //check from tsdb api + return true; +} + +void updateInfoDestroy(SUpdateInfo *pInfo) { + if (pInfo == NULL) { + return; + } + taosArrayDestroy(pInfo->pTsBuckets); + + uint64_t size = taosArrayGetSize(pInfo->pTsSBFs); + for (uint64_t i = 0; i < size; i++) { + SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i); + tScalableBfDestroy(pSBF); + } + + taosArrayDestroy(pInfo->pTsSBFs); + taosMemoryFree(pInfo); +} \ No newline at end of file diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt index e69de29bb2..96209f6812 100644 --- a/source/libs/stream/test/CMakeLists.txt +++ b/source/libs/stream/test/CMakeLists.txt @@ -0,0 +1,20 @@ + +MESSAGE(STATUS "build stream unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +# bloomFilterTest +ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp") + +TARGET_LINK_LIBRARIES( + streamUpdateTest + PUBLIC os util common gtest stream +) + +TARGET_INCLUDE_DIRECTORIES( + streamUpdateTest + PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/" + PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc" +) \ No newline at end of file diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp new file mode 100644 index 0000000000..ed016ead44 --- /dev/null +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -0,0 +1,103 @@ +#include + +#include "tstreamUpdate.h" +#include "ttime.h" + +using namespace std; + +TEST(TD_STREAM_UPDATE_TEST, update) { + int64_t interval = 20 * 1000; + int64_t watermark = 10 * 60 * 1000; + SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); + GTEST_ASSERT_EQ(isUpdated(pSU,1, 0), true); + GTEST_ASSERT_EQ(isUpdated(pSU,1, -1), true); + + for(int i=0; i < 1024; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), false); + } + for(int i=0; i < 1024; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true); + } + + for(int i=0; i < 1024; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), false); + } + for(int i=0; i < 1024; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), true); + } + + for(int i=0; i < 1024; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true); + } + + for(int i=3; i < 1024; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU,0, i), false); + } + GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023); + + for(int i=3; i < 1024; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU,0, i), true); + } + GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023); + + SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); + for(int i=1; i <= watermark / interval; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(pSU1->minTS, interval); + GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval); + } + for(int i=0; i < pSU1->numSBFs; i++) { + SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, i); + SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0); + GTEST_ASSERT_EQ(pBF->size, 1); + } + + for(int i= watermark / interval + 1, j = 2 ; i <= watermark / interval + 10; i++,j++) { + GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(pSU1->minTS, interval*j); + GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval); + SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, pSU1->numSBFs - 1); + SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0); + GTEST_ASSERT_EQ(pBF->size, 1); + } + + for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 2), j++) { + GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(pSU1->minTS, (i-(pSU1->numSBFs-1))*interval); + GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval); + } + + SUpdateInfo *pSU2 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); + GTEST_ASSERT_EQ(isUpdated(pSU2, 1, 1 * interval + 5), false); + GTEST_ASSERT_EQ(pSU2->minTS, interval); + for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 10), j++) { + GTEST_ASSERT_EQ(isUpdated(pSU2, 1, i * interval + 5), false); + GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval); + GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval); + GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5); + } + + SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark); + for(int j = 1; j < 100; j++) { + for(int i = 0; i < pSU3->numSBFs; i++) { + GTEST_ASSERT_EQ(isUpdated(pSU3, i, i * interval + 5 * j), false); + GTEST_ASSERT_EQ(pSU3->minTS, 0); + GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval); + GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j); + SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU3->pTsSBFs, i); + SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0); + GTEST_ASSERT_EQ(pBF->size, j); + } + } + + + updateInfoDestroy(pSU); + updateInfoDestroy(pSU1); + updateInfoDestroy(pSU2); + updateInfoDestroy(pSU3); +} + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 18bc883143..a34b332b9d 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -318,6 +318,20 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { pArray->size = 0; } +void taosArrayClearP(SArray* pArray, FDelete fp) { + if (pArray == NULL) return; + if (fp == NULL) { + pArray->size = 0; + return; + } + + for (int32_t i = 0; i < pArray->size; ++i) { + fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + } + + pArray->size = 0; +} + void* taosArrayDestroy(SArray* pArray) { if (pArray) { taosMemoryFree(pArray->pData); diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c new file mode 100644 index 0000000000..52c541ae2e --- /dev/null +++ b/source/util/src/tbloomfilter.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE + +#include "tbloomfilter.h" +#include "taos.h" +#include "taoserror.h" + +#define UNIT_NUM_BITS 64 +#define UNIT_ADDR_NUM_BITS 6 + +static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) { + uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS; + uint64_t mask = 1 << (index % UNIT_NUM_BITS); + uint64_t old = buf[unitIndex]; + buf[unitIndex] |= mask; + return buf[unitIndex] != old; +} + +static FORCE_INLINE bool getBit(uint64_t *buf, uint64_t index) { + uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS; + uint64_t mask = 1 << (index % UNIT_NUM_BITS); + return buf[unitIndex] & mask; +} + +SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { + if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) { + return NULL; + } + SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter)); + if (pBF == NULL) { + return NULL; + } + pBF->expectedEntries = expectedEntries; + pBF->errorRate = errorRate; + + double lnRate = fabs(log(errorRate)); + // ln(2)^2 = 0.480453013918201 + // m = - n * ln(P) / ( ln(2) )^2 + // m is the size of bloom filter, n is expected entries, P is false positive probability + pBF->numUnits = (uint64_t) ceil(expectedEntries * lnRate / 0.480453013918201 / UNIT_NUM_BITS); + pBF->numBits = pBF->numUnits * 64; + pBF->size = 0; + + // ln(2) = 0.693147180559945 + pBF->hashFunctions = (uint32_t) ceil(lnRate / 0.693147180559945); + pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); + pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); + pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); + if (pBF->buffer == NULL) { + tBloomFilterDestroy(pBF); + return NULL; + } + return pBF; +} + +int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) { + ASSERT(!tBloomFilterIsFull(pBF)); + uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len); + uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len); + bool hasChange = false; + const register uint64_t size = pBF->numBits; + uint64_t cbHash = h1; + for (uint64_t i = 0; i < pBF->hashFunctions; ++i) { + hasChange |= setBit(pBF->buffer, cbHash % size); + cbHash += h2; + } + if (hasChange) { + pBF->size++; + return TSDB_CODE_SUCCESS; + } + return TSDB_CODE_FAILED; +} + +int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf, + uint32_t len) { + uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len); + uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len); + const register uint64_t size = pBF->numBits; + uint64_t cbHash = h1; + for (uint64_t i = 0; i < pBF->hashFunctions; ++i) { + if (!getBit(pBF->buffer, cbHash % size)) { + return TSDB_CODE_SUCCESS; + } + cbHash += h2; + } + return TSDB_CODE_FAILED; +} + +void tBloomFilterDestroy(SBloomFilter *pBF) { + if (pBF == NULL) { + return; + } + taosMemoryFree(pBF->buffer); + taosMemoryFree(pBF); +} + +void tBloomFilterDump(const struct SBloomFilter *pBF) { +// ToDo +} + +bool tBloomFilterIsFull(const SBloomFilter *pBF) { + return pBF->size >= pBF->expectedEntries; +} \ No newline at end of file diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c new file mode 100644 index 0000000000..9ddac44e20 --- /dev/null +++ b/source/util/src/tscalablebf.c @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE + +#include "tscalablebf.h" +#include "taoserror.h" + +#define DEFAULT_GROWTH 2 +#define DEFAULT_TIGHTENING_RATIO 0.5 + +static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, + double errorRate); + +SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { + const uint32_t defaultSize = 8; + if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) { + return NULL; + } + SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); + if (pSBf == NULL) { + return NULL; + } + pSBf->numBits = 0; + pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *)); + if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL ) { + tScalableBfDestroy(pSBf); + return NULL; + } + pSBf->growth = DEFAULT_GROWTH; + return pSBf; +} + +int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { + int32_t size = taosArrayGetSize(pSBf->bfArray); + for (int32_t i = size - 2; i >= 0; --i) { + if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), + keyBuf, len) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + } + + SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); + ASSERT(pNormalBf); + if (tBloomFilterIsFull(pNormalBf)) { + pNormalBf = tScalableBfAddFilter(pSBf, + pNormalBf->expectedEntries * pSBf->growth, + pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); + if (pNormalBf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return tBloomFilterPut(pNormalBf, keyBuf, len); +} + +int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, + uint32_t len) { + int32_t size = taosArrayGetSize(pSBf->bfArray); + for (int32_t i = size - 1; i >= 0; --i) { + if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), + keyBuf, len) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + } + return TSDB_CODE_SUCCESS; +} + +static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, + double errorRate) { + SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate); + if (pNormalBf == NULL) { + return NULL; + } + if(taosArrayPush(pSBf->bfArray, &pNormalBf) == NULL) { + tBloomFilterDestroy(pNormalBf); + return NULL; + } + pSBf->numBits += pNormalBf->numBits; + return pNormalBf; +} + +void tScalableBfDestroy(SScalableBf *pSBf) { + if (pSBf == NULL) { + return; + } + if (pSBf->bfArray != NULL) { + taosArrayDestroyP(pSBf->bfArray, (FDelete)tBloomFilterDestroy); + } + taosMemoryFree(pSBf); +} + +void tScalableBfDump(const SScalableBf *pSBf) { + // Todo; +} \ No newline at end of file diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index 3c52d52d50..582618ef5c 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -59,4 +59,12 @@ target_link_libraries(cfgTest os util gtest_main) add_test( NAME cfgTest COMMAND cfgTest +) + +# bloomFilterTest +add_executable(bloomFilterTest "bloomFilterTest.cpp") +target_link_libraries(bloomFilterTest os util gtest_main) +add_test( + NAME bloomFilterTest + COMMAND bloomFilterTest ) \ No newline at end of file diff --git a/source/util/test/bloomFilterTest.cpp b/source/util/test/bloomFilterTest.cpp new file mode 100644 index 0000000000..fcb1b4f0ae --- /dev/null +++ b/source/util/test/bloomFilterTest.cpp @@ -0,0 +1,140 @@ +#include + +#include "tscalablebf.h" +#include "taoserror.h" + +using namespace std; + +TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) { + int64_t ts1 = 1650803518000; + + GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 0)); + GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 1)); + GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, -0.1)); + GTEST_ASSERT_EQ(NULL, tBloomFilterInit(0, 0.01)); + + SBloomFilter *pBF1 = tBloomFilterInit(100, 0.005); + GTEST_ASSERT_EQ(pBF1->numBits, 1152); + GTEST_ASSERT_EQ(pBF1->numUnits, 1152/64); + int64_t count = 0; + for(int64_t i = 0; count < 100; i++) { + int64_t ts = i + ts1; + if(tBloomFilterPut(pBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) { + count++; + } + } + ASSERT_TRUE(tBloomFilterIsFull(pBF1)); + + SBloomFilter *pBF2 = tBloomFilterInit(1000*10000, 0.1); + GTEST_ASSERT_EQ(pBF2->numBits, 47925312); + GTEST_ASSERT_EQ(pBF2->numUnits, 47925312/64); + + SBloomFilter *pBF3 = tBloomFilterInit(10000*10000, 0.001); + GTEST_ASSERT_EQ(pBF3->numBits, 1437758784); + GTEST_ASSERT_EQ(pBF3->numUnits, 1437758784/64); + + int64_t size = 10000; + SBloomFilter *pBF4 = tBloomFilterInit(size, 0.001); + for(int64_t i = 0; i < 1000; i++) { + int64_t ts = i + ts1; + GTEST_ASSERT_EQ(tBloomFilterPut(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS); + } + ASSERT_TRUE(!tBloomFilterIsFull(pBF4)); + + for(int64_t i = 0; i < 1000; i++) { + int64_t ts = i + ts1; + GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED); + } + + for(int64_t i = 2000; i < 3000; i++) { + int64_t ts = i + ts1; + GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS); + } + + tBloomFilterDestroy(pBF1); + tBloomFilterDestroy(pBF2); + tBloomFilterDestroy(pBF3); + tBloomFilterDestroy(pBF4); + +} + +TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { + int64_t ts1 = 1650803518000; + + GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 0)); + GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 1)); + GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, -0.1)); + GTEST_ASSERT_EQ(NULL, tScalableBfInit(0, 0.01)); + + SScalableBf *pSBF1 = tScalableBfInit(100, 0.01); + GTEST_ASSERT_EQ(pSBF1->numBits, 1152); + int64_t count = 0; + int64_t index = 0; + for( ; count < 100; index++) { + int64_t ts = index + ts1; + if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) { + count++; + } + } + GTEST_ASSERT_EQ(pSBF1->numBits, 1152); + + for( ; count < 300; index++) { + int64_t ts = index + ts1; + if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) { + count++; + } + } + GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496); + + for( ; count < 700; index++) { + int64_t ts = index + ts1; + if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) { + count++; + } + } + GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496+5568); + + for( ; count < 1500; index++) { + int64_t ts = index + ts1; + if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) { + count++; + } + } + GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496+5568+12288); + + int32_t aSize = taosArrayGetSize(pSBF1->bfArray); + int64_t totalBits = 0; + for(int64_t i = 0; i < aSize; i++) { + SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF1->bfArray, i); + ASSERT_TRUE(tBloomFilterIsFull(pBF)); + totalBits += pBF->numBits; + } + GTEST_ASSERT_EQ(pSBF1->numBits, totalBits); + + for(int64_t i = 0; i < index; i++) { + int64_t ts = i + ts1; + GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF1, &ts, sizeof(int64_t)), TSDB_CODE_FAILED); + } + + + int64_t size = 10000; + SScalableBf *pSBF4 = tScalableBfInit(size, 0.001); + for(int64_t i = 0; i < 1000; i++) { + int64_t ts = i + ts1; + GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS); + } + + for(int64_t i = 0; i < 1000; i++) { + int64_t ts = i + ts1; + GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED); + } + + for(int64_t i = 2000; i < 3000; i++) { + int64_t ts = i + ts1; + GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS); + } + + tScalableBfDestroy(pSBF1); + tScalableBfDestroy(pSBF4); + +} \ No newline at end of file