stream update
This commit is contained in:
parent
a5aefd017b
commit
fde3f3f68f
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#ifndef _TSTREAMUPDATE_H_
|
||||
#define _TSTREAMUPDATE_H_
|
||||
|
||||
#include "taosdef.h"
|
||||
#include "tarray.h"
|
||||
#include "tmsg.h"
|
||||
#include "tscalablebf.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SUpdateInfo {
|
||||
SArray *pTsBuckets;
|
||||
uint64_t numBuckets;
|
||||
SArray *pTsSBFs;
|
||||
uint64_t numSBFs;
|
||||
int64_t interval;
|
||||
int64_t watermark;
|
||||
TSKEY minTS;
|
||||
} SUpdateInfo;
|
||||
|
||||
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
|
||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* ifndef _TSTREAMUPDATE_H_ */
|
|
@ -218,6 +218,13 @@ void taosArrayClear(SArray* pArray);
|
|||
*/
|
||||
void taosArrayClearEx(SArray* pArray, void (*fp)(void*));
|
||||
|
||||
/**
|
||||
* clear the array (remove all element)
|
||||
* @param pArray
|
||||
* @param fp
|
||||
*/
|
||||
void taosArrayClearP(SArray* pArray, FDelete fp);
|
||||
|
||||
void* taosArrayDestroy(SArray* pArray);
|
||||
void taosArrayDestroyP(SArray* pArray, FDelete fp);
|
||||
void taosArrayDestroyEx(SArray* pArray, FDelete fp);
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_UTIL_BLOOMFILTER_H_
|
||||
#define _TD_UTIL_BLOOMFILTER_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "thash.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SBloomFilter {
|
||||
uint32_t hashFunctions;
|
||||
uint64_t expectedEntries;
|
||||
uint64_t numUnits;
|
||||
uint64_t numBits;
|
||||
uint64_t size;
|
||||
_hash_fn_t hashFn1;
|
||||
_hash_fn_t hashFn2;
|
||||
void *buffer;
|
||||
double errorRate;
|
||||
} SBloomFilter;
|
||||
|
||||
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate);
|
||||
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len);
|
||||
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf,
|
||||
uint32_t len);
|
||||
void tBloomFilterDestroy(SBloomFilter *pBF);
|
||||
void tBloomFilterDump(const SBloomFilter *pBF);
|
||||
bool tBloomFilterIsFull(const SBloomFilter *pBF);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_BLOOMFILTER_H_*/
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_UTIL_SCALABLEBF_H_
|
||||
#define _TD_UTIL_SCALABLEBF_H_
|
||||
|
||||
#include "tbloomfilter.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SScalableBf {
|
||||
SArray *bfArray; // array of bloom filters
|
||||
uint32_t growth;
|
||||
uint64_t numBits;
|
||||
} SScalableBf;
|
||||
|
||||
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate);
|
||||
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len);
|
||||
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf,
|
||||
uint32_t len);
|
||||
void tScalableBfDestroy(SScalableBf *pSBf);
|
||||
void tScalableBfDump(const SScalableBf *pSBf);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_UTIL_SCALABLEBF_H_*/
|
|
@ -0,0 +1,172 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tstreamUpdate.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define DEFAULT_FALSE_POSITIVE 0.01
|
||||
#define DEFAULT_BUCKET_SIZE 1024
|
||||
#define ROWS_PER_MILLISECOND 1
|
||||
#define MAX_NUM_SCALABLE_BF 120
|
||||
#define MIN_NUM_SCALABLE_BF 10
|
||||
#define DEFAULT_PREADD_BUCKET 1
|
||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
||||
|
||||
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
||||
if (pInfo->numSBFs < count ) {
|
||||
count = pInfo->numSBFs;
|
||||
}
|
||||
for (uint64_t i = 0; i < count; ++i) {
|
||||
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND,
|
||||
DEFAULT_FALSE_POSITIVE);
|
||||
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
||||
}
|
||||
}
|
||||
|
||||
static void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) {
|
||||
if (count < pInfo->numSBFs - 1) {
|
||||
for (uint64_t i = 0; i < count; ++i) {
|
||||
SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||
tScalableBfDestroy(pTsSBFs);
|
||||
taosArrayRemove(pInfo->pTsSBFs, i);
|
||||
}
|
||||
} else {
|
||||
taosArrayClearP(pInfo->pTsSBFs, (FDelete)tScalableBfDestroy);
|
||||
}
|
||||
pInfo->minTS += pInfo->interval * count;
|
||||
}
|
||||
|
||||
static int64_t adjustInterval(int64_t interval, int32_t precision) {
|
||||
int64_t val = interval;
|
||||
if (precision != TSDB_TIME_PRECISION_MILLI) {
|
||||
val = convertTimePrecision(interval, precision, TSDB_TIME_PRECISION_MILLI);
|
||||
}
|
||||
if (val < MIN_INTERVAL) {
|
||||
val = MIN_INTERVAL;
|
||||
} else if (val > MAX_INTERVAL) {
|
||||
val = MAX_INTERVAL;
|
||||
}
|
||||
val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision);
|
||||
return val;
|
||||
}
|
||||
|
||||
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark) {
|
||||
return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
|
||||
}
|
||||
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark) {
|
||||
SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (pInfo == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pInfo->pTsBuckets = NULL;
|
||||
pInfo->pTsSBFs = NULL;
|
||||
pInfo->minTS = -1;
|
||||
pInfo->interval = adjustInterval(interval, precision);
|
||||
pInfo->watermark = watermark;
|
||||
|
||||
uint64_t bfSize = (uint64_t)(watermark / pInfo->interval);
|
||||
if (bfSize < MIN_NUM_SCALABLE_BF) {
|
||||
bfSize = MIN_NUM_SCALABLE_BF;
|
||||
} else if (bfSize > MAX_NUM_SCALABLE_BF) {
|
||||
bfSize = MAX_NUM_SCALABLE_BF;
|
||||
}
|
||||
|
||||
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(SScalableBf));
|
||||
if (pInfo->pTsSBFs == NULL) {
|
||||
updateInfoDestroy(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
pInfo->numSBFs = bfSize;
|
||||
windowSBfAdd(pInfo, bfSize);
|
||||
|
||||
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
|
||||
if (pInfo->pTsBuckets == NULL) {
|
||||
updateInfoDestroy(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TSKEY dumy = 0;
|
||||
for(uint64_t i=0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
||||
taosArrayPush(pInfo->pTsBuckets, &dumy);
|
||||
}
|
||||
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
||||
if (ts <= 0) {
|
||||
return NULL;
|
||||
}
|
||||
if (pInfo->minTS < 0) {
|
||||
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
|
||||
}
|
||||
uint64_t index = (uint64_t)((ts - pInfo->minTS) / pInfo->interval);
|
||||
if (index >= pInfo->numSBFs) {
|
||||
uint64_t count = index + 1 - pInfo->numSBFs;
|
||||
windowSBfDelete(pInfo, count);
|
||||
windowSBfAdd(pInfo, count);
|
||||
index = pInfo->numSBFs - 1;
|
||||
}
|
||||
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
|
||||
if (res == NULL) {
|
||||
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND,
|
||||
DEFAULT_FALSE_POSITIVE);
|
||||
taosArrayPush(pInfo->pTsSBFs, &res);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
bool isUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
|
||||
int32_t res = TSDB_CODE_FAILED;
|
||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||
SScalableBf* pSBf = getSBf(pInfo, ts);
|
||||
// pSBf may be a null pointer
|
||||
if (pSBf) {
|
||||
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
||||
}
|
||||
|
||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||
if (maxTs < ts ) {
|
||||
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (ts < pInfo->minTS) {
|
||||
return true;
|
||||
} else if (res == TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//check from tsdb api
|
||||
return true;
|
||||
}
|
||||
|
||||
void updateInfoDestroy(SUpdateInfo *pInfo) {
|
||||
if (pInfo == NULL) {
|
||||
return;
|
||||
}
|
||||
taosArrayDestroy(pInfo->pTsBuckets);
|
||||
|
||||
uint64_t size = taosArrayGetSize(pInfo->pTsSBFs);
|
||||
for (uint64_t i = 0; i < size; i++) {
|
||||
SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i);
|
||||
tScalableBfDestroy(pSBF);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pInfo->pTsSBFs);
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
|
||||
MESSAGE(STATUS "build stream unit test")
|
||||
|
||||
# GoogleTest requires at least C++11
|
||||
SET(CMAKE_CXX_STANDARD 11)
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
# bloomFilterTest
|
||||
ADD_EXECUTABLE(streamUpdateTest "tstreamUpdateTest.cpp")
|
||||
|
||||
TARGET_LINK_LIBRARIES(
|
||||
streamUpdateTest
|
||||
PUBLIC os util common gtest stream
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
streamUpdateTest
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream/"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/libs/stream/inc"
|
||||
)
|
|
@ -0,0 +1,103 @@
|
|||
#include <gtest/gtest.h>
|
||||
|
||||
#include "tstreamUpdate.h"
|
||||
#include "ttime.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
TEST(TD_STREAM_UPDATE_TEST, update) {
|
||||
int64_t interval = 20 * 1000;
|
||||
int64_t watermark = 10 * 60 * 1000;
|
||||
SUpdateInfo *pSU = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,1, 0), true);
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,1, -1), true);
|
||||
|
||||
for(int i=0; i < 1024; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), false);
|
||||
}
|
||||
for(int i=0; i < 1024; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true);
|
||||
}
|
||||
|
||||
for(int i=0; i < 1024; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), false);
|
||||
}
|
||||
for(int i=0; i < 1024; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,i, 2), true);
|
||||
}
|
||||
|
||||
for(int i=0; i < 1024; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,i, 1), true);
|
||||
}
|
||||
|
||||
for(int i=3; i < 1024; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,0, i), false);
|
||||
}
|
||||
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
|
||||
|
||||
for(int i=3; i < 1024; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU,0, i), true);
|
||||
}
|
||||
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU->pTsBuckets,0), 1023);
|
||||
|
||||
SUpdateInfo *pSU1 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||
for(int i=1; i <= watermark / interval; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
|
||||
GTEST_ASSERT_EQ(pSU1->minTS, interval);
|
||||
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
|
||||
}
|
||||
for(int i=0; i < pSU1->numSBFs; i++) {
|
||||
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, i);
|
||||
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
|
||||
GTEST_ASSERT_EQ(pBF->size, 1);
|
||||
}
|
||||
|
||||
for(int i= watermark / interval + 1, j = 2 ; i <= watermark / interval + 10; i++,j++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
|
||||
GTEST_ASSERT_EQ(pSU1->minTS, interval*j);
|
||||
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
|
||||
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU1->pTsSBFs, pSU1->numSBFs - 1);
|
||||
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
|
||||
GTEST_ASSERT_EQ(pBF->size, 1);
|
||||
}
|
||||
|
||||
for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 2), j++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU1, 1, i * interval + 5), false);
|
||||
GTEST_ASSERT_EQ(pSU1->minTS, (i-(pSU1->numSBFs-1))*interval);
|
||||
GTEST_ASSERT_EQ(pSU1->numSBFs, watermark / interval);
|
||||
}
|
||||
|
||||
SUpdateInfo *pSU2 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU2, 1, 1 * interval + 5), false);
|
||||
GTEST_ASSERT_EQ(pSU2->minTS, interval);
|
||||
for(int i= watermark / interval * 100, j = 0; j < 10; i+= (watermark / interval * 10), j++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU2, 1, i * interval + 5), false);
|
||||
GTEST_ASSERT_EQ(pSU2->minTS, (i-(pSU2->numSBFs-1))*interval);
|
||||
GTEST_ASSERT_EQ(pSU2->numSBFs, watermark / interval);
|
||||
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU2->pTsBuckets,1), i * interval + 5);
|
||||
}
|
||||
|
||||
SUpdateInfo *pSU3 = updateInfoInit(interval, TSDB_TIME_PRECISION_MILLI, watermark);
|
||||
for(int j = 1; j < 100; j++) {
|
||||
for(int i = 0; i < pSU3->numSBFs; i++) {
|
||||
GTEST_ASSERT_EQ(isUpdated(pSU3, i, i * interval + 5 * j), false);
|
||||
GTEST_ASSERT_EQ(pSU3->minTS, 0);
|
||||
GTEST_ASSERT_EQ(pSU3->numSBFs, watermark / interval);
|
||||
GTEST_ASSERT_EQ(*(int64_t*)taosArrayGet(pSU3->pTsBuckets, i), i * interval + 5 * j);
|
||||
SScalableBf *pSBF = (SScalableBf *)taosArrayGetP(pSU3->pTsSBFs, i);
|
||||
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF->bfArray, 0);
|
||||
GTEST_ASSERT_EQ(pBF->size, j);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
updateInfoDestroy(pSU);
|
||||
updateInfoDestroy(pSU1);
|
||||
updateInfoDestroy(pSU2);
|
||||
updateInfoDestroy(pSU3);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
|
@ -318,6 +318,20 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
|
|||
pArray->size = 0;
|
||||
}
|
||||
|
||||
void taosArrayClearP(SArray* pArray, FDelete fp) {
|
||||
if (pArray == NULL) return;
|
||||
if (fp == NULL) {
|
||||
pArray->size = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pArray->size; ++i) {
|
||||
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
|
||||
}
|
||||
|
||||
pArray->size = 0;
|
||||
}
|
||||
|
||||
void* taosArrayDestroy(SArray* pArray) {
|
||||
if (pArray) {
|
||||
taosMemoryFree(pArray->pData);
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tbloomfilter.h"
|
||||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
#define UNIT_NUM_BITS 64
|
||||
#define UNIT_ADDR_NUM_BITS 6
|
||||
|
||||
static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
|
||||
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
|
||||
uint64_t mask = 1 << (index % UNIT_NUM_BITS);
|
||||
uint64_t old = buf[unitIndex];
|
||||
buf[unitIndex] |= mask;
|
||||
return buf[unitIndex] != old;
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool getBit(uint64_t *buf, uint64_t index) {
|
||||
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
|
||||
uint64_t mask = 1 << (index % UNIT_NUM_BITS);
|
||||
return buf[unitIndex] & mask;
|
||||
}
|
||||
|
||||
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
|
||||
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
|
||||
return NULL;
|
||||
}
|
||||
SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
|
||||
if (pBF == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pBF->expectedEntries = expectedEntries;
|
||||
pBF->errorRate = errorRate;
|
||||
|
||||
double lnRate = fabs(log(errorRate));
|
||||
// ln(2)^2 = 0.480453013918201
|
||||
// m = - n * ln(P) / ( ln(2) )^2
|
||||
// m is the size of bloom filter, n is expected entries, P is false positive probability
|
||||
pBF->numUnits = (uint64_t) ceil(expectedEntries * lnRate / 0.480453013918201 / UNIT_NUM_BITS);
|
||||
pBF->numBits = pBF->numUnits * 64;
|
||||
pBF->size = 0;
|
||||
|
||||
// ln(2) = 0.693147180559945
|
||||
pBF->hashFunctions = (uint32_t) ceil(lnRate / 0.693147180559945);
|
||||
pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
|
||||
pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);
|
||||
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
|
||||
if (pBF->buffer == NULL) {
|
||||
tBloomFilterDestroy(pBF);
|
||||
return NULL;
|
||||
}
|
||||
return pBF;
|
||||
}
|
||||
|
||||
int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) {
|
||||
ASSERT(!tBloomFilterIsFull(pBF));
|
||||
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
|
||||
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
|
||||
bool hasChange = false;
|
||||
const register uint64_t size = pBF->numBits;
|
||||
uint64_t cbHash = h1;
|
||||
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
|
||||
hasChange |= setBit(pBF->buffer, cbHash % size);
|
||||
cbHash += h2;
|
||||
}
|
||||
if (hasChange) {
|
||||
pBF->size++;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t tBloomFilterNoContain(const SBloomFilter *pBF, const void *keyBuf,
|
||||
uint32_t len) {
|
||||
uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len);
|
||||
uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len);
|
||||
const register uint64_t size = pBF->numBits;
|
||||
uint64_t cbHash = h1;
|
||||
for (uint64_t i = 0; i < pBF->hashFunctions; ++i) {
|
||||
if (!getBit(pBF->buffer, cbHash % size)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
cbHash += h2;
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
void tBloomFilterDestroy(SBloomFilter *pBF) {
|
||||
if (pBF == NULL) {
|
||||
return;
|
||||
}
|
||||
taosMemoryFree(pBF->buffer);
|
||||
taosMemoryFree(pBF);
|
||||
}
|
||||
|
||||
void tBloomFilterDump(const struct SBloomFilter *pBF) {
|
||||
// ToDo
|
||||
}
|
||||
|
||||
bool tBloomFilterIsFull(const SBloomFilter *pBF) {
|
||||
return pBF->size >= pBF->expectedEntries;
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tscalablebf.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
#define DEFAULT_GROWTH 2
|
||||
#define DEFAULT_TIGHTENING_RATIO 0.5
|
||||
|
||||
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries,
|
||||
double errorRate);
|
||||
|
||||
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
|
||||
const uint32_t defaultSize = 8;
|
||||
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
|
||||
return NULL;
|
||||
}
|
||||
SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
|
||||
if (pSBf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pSBf->numBits = 0;
|
||||
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *));
|
||||
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL ) {
|
||||
tScalableBfDestroy(pSBf);
|
||||
return NULL;
|
||||
}
|
||||
pSBf->growth = DEFAULT_GROWTH;
|
||||
return pSBf;
|
||||
}
|
||||
|
||||
int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
|
||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||
for (int32_t i = size - 2; i >= 0; --i) {
|
||||
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i),
|
||||
keyBuf, len) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
|
||||
ASSERT(pNormalBf);
|
||||
if (tBloomFilterIsFull(pNormalBf)) {
|
||||
pNormalBf = tScalableBfAddFilter(pSBf,
|
||||
pNormalBf->expectedEntries * pSBf->growth,
|
||||
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
|
||||
if (pNormalBf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return tBloomFilterPut(pNormalBf, keyBuf, len);
|
||||
}
|
||||
|
||||
int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf,
|
||||
uint32_t len) {
|
||||
int32_t size = taosArrayGetSize(pSBf->bfArray);
|
||||
for (int32_t i = size - 1; i >= 0; --i) {
|
||||
if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i),
|
||||
keyBuf, len) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries,
|
||||
double errorRate) {
|
||||
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
|
||||
if (pNormalBf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
if(taosArrayPush(pSBf->bfArray, &pNormalBf) == NULL) {
|
||||
tBloomFilterDestroy(pNormalBf);
|
||||
return NULL;
|
||||
}
|
||||
pSBf->numBits += pNormalBf->numBits;
|
||||
return pNormalBf;
|
||||
}
|
||||
|
||||
void tScalableBfDestroy(SScalableBf *pSBf) {
|
||||
if (pSBf == NULL) {
|
||||
return;
|
||||
}
|
||||
if (pSBf->bfArray != NULL) {
|
||||
taosArrayDestroyP(pSBf->bfArray, (FDelete)tBloomFilterDestroy);
|
||||
}
|
||||
taosMemoryFree(pSBf);
|
||||
}
|
||||
|
||||
void tScalableBfDump(const SScalableBf *pSBf) {
|
||||
// Todo;
|
||||
}
|
|
@ -59,4 +59,12 @@ target_link_libraries(cfgTest os util gtest_main)
|
|||
add_test(
|
||||
NAME cfgTest
|
||||
COMMAND cfgTest
|
||||
)
|
||||
|
||||
# bloomFilterTest
|
||||
add_executable(bloomFilterTest "bloomFilterTest.cpp")
|
||||
target_link_libraries(bloomFilterTest os util gtest_main)
|
||||
add_test(
|
||||
NAME bloomFilterTest
|
||||
COMMAND bloomFilterTest
|
||||
)
|
|
@ -0,0 +1,140 @@
|
|||
#include <gtest/gtest.h>
|
||||
|
||||
#include "tscalablebf.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
|
||||
int64_t ts1 = 1650803518000;
|
||||
|
||||
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 0));
|
||||
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 1));
|
||||
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, -0.1));
|
||||
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(0, 0.01));
|
||||
|
||||
SBloomFilter *pBF1 = tBloomFilterInit(100, 0.005);
|
||||
GTEST_ASSERT_EQ(pBF1->numBits, 1152);
|
||||
GTEST_ASSERT_EQ(pBF1->numUnits, 1152/64);
|
||||
int64_t count = 0;
|
||||
for(int64_t i = 0; count < 100; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
if(tBloomFilterPut(pBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
ASSERT_TRUE(tBloomFilterIsFull(pBF1));
|
||||
|
||||
SBloomFilter *pBF2 = tBloomFilterInit(1000*10000, 0.1);
|
||||
GTEST_ASSERT_EQ(pBF2->numBits, 47925312);
|
||||
GTEST_ASSERT_EQ(pBF2->numUnits, 47925312/64);
|
||||
|
||||
SBloomFilter *pBF3 = tBloomFilterInit(10000*10000, 0.001);
|
||||
GTEST_ASSERT_EQ(pBF3->numBits, 1437758784);
|
||||
GTEST_ASSERT_EQ(pBF3->numUnits, 1437758784/64);
|
||||
|
||||
int64_t size = 10000;
|
||||
SBloomFilter *pBF4 = tBloomFilterInit(size, 0.001);
|
||||
for(int64_t i = 0; i < 1000; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
GTEST_ASSERT_EQ(tBloomFilterPut(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||
}
|
||||
ASSERT_TRUE(!tBloomFilterIsFull(pBF4));
|
||||
|
||||
for(int64_t i = 0; i < 1000; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
for(int64_t i = 2000; i < 3000; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
tBloomFilterDestroy(pBF1);
|
||||
tBloomFilterDestroy(pBF2);
|
||||
tBloomFilterDestroy(pBF3);
|
||||
tBloomFilterDestroy(pBF4);
|
||||
|
||||
}
|
||||
|
||||
TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
|
||||
int64_t ts1 = 1650803518000;
|
||||
|
||||
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 0));
|
||||
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 1));
|
||||
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, -0.1));
|
||||
GTEST_ASSERT_EQ(NULL, tScalableBfInit(0, 0.01));
|
||||
|
||||
SScalableBf *pSBF1 = tScalableBfInit(100, 0.01);
|
||||
GTEST_ASSERT_EQ(pSBF1->numBits, 1152);
|
||||
int64_t count = 0;
|
||||
int64_t index = 0;
|
||||
for( ; count < 100; index++) {
|
||||
int64_t ts = index + ts1;
|
||||
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
GTEST_ASSERT_EQ(pSBF1->numBits, 1152);
|
||||
|
||||
for( ; count < 300; index++) {
|
||||
int64_t ts = index + ts1;
|
||||
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496);
|
||||
|
||||
for( ; count < 700; index++) {
|
||||
int64_t ts = index + ts1;
|
||||
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496+5568);
|
||||
|
||||
for( ; count < 1500; index++) {
|
||||
int64_t ts = index + ts1;
|
||||
if(tScalableBfPut(pSBF1, &ts, sizeof(int64_t)) == TSDB_CODE_SUCCESS ) {
|
||||
count++;
|
||||
}
|
||||
}
|
||||
GTEST_ASSERT_EQ(pSBF1->numBits, 1152+2496+5568+12288);
|
||||
|
||||
int32_t aSize = taosArrayGetSize(pSBF1->bfArray);
|
||||
int64_t totalBits = 0;
|
||||
for(int64_t i = 0; i < aSize; i++) {
|
||||
SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF1->bfArray, i);
|
||||
ASSERT_TRUE(tBloomFilterIsFull(pBF));
|
||||
totalBits += pBF->numBits;
|
||||
}
|
||||
GTEST_ASSERT_EQ(pSBF1->numBits, totalBits);
|
||||
|
||||
for(int64_t i = 0; i < index; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF1, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
|
||||
int64_t size = 10000;
|
||||
SScalableBf *pSBF4 = tScalableBfInit(size, 0.001);
|
||||
for(int64_t i = 0; i < 1000; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
for(int64_t i = 0; i < 1000; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
for(int64_t i = 2000; i < 3000; i++) {
|
||||
int64_t ts = i + ts1;
|
||||
GTEST_ASSERT_EQ(tScalableBfNoContain(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
tScalableBfDestroy(pSBF1);
|
||||
tScalableBfDestroy(pSBF4);
|
||||
|
||||
}
|
Loading…
Reference in New Issue