feat(stream): optimize update data check

This commit is contained in:
54liuyao 2022-07-21 15:16:57 +08:00
parent 3ae378e0c5
commit f5a326dbf4
2 changed files with 18 additions and 5 deletions

View File

@ -33,11 +33,12 @@ typedef struct SUpdateInfo {
int64_t watermark;
TSKEY minTS;
SScalableBf* pCloseWinSBF;
SHashObj* pMap;
} SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);

View File

@ -15,9 +15,12 @@
#include "tstreamUpdate.h"
#include "ttime.h"
#include "query.h"
#define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 131072
#define DEFAULT_BUCKET_SIZE 1310720
#define DEFAULT_MAP_CAPACITY 1310720
#define DEFAULT_MAP_SIZE (DEFAULT_MAP_CAPACITY * 10)
#define ROWS_PER_MILLISECOND 1
#define MAX_NUM_SCALABLE_BF 100000
#define MIN_NUM_SCALABLE_BF 10
@ -120,6 +123,8 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
}
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
pInfo->pCloseWinSBF = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
return pInfo;
}
@ -149,8 +154,9 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
return res;
}
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
int32_t res = TSDB_CODE_FAILED;
TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
if (ts < maxTs - pInfo->watermark) {
@ -167,7 +173,13 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
}
if (maxTs < ts) {
int32_t size = taosHashGetSize(pInfo->pMap);
if ( (!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
return false;
}
if ( !pMapMaxTs && maxTs < ts ) {
taosArraySet(pInfo->pTsBuckets, index, &ts);
return false;
}
@ -177,7 +189,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
} else if (res == TSDB_CODE_SUCCESS) {
return false;
}
qDebug("===stream===bucket:%d, tableId:%" PRIu64 ", maxTs:" PRIu64 ", maxMapTs:" PRIu64 ", ts:%" PRIu64, index, tableId, maxTs, *pMapMaxTs, ts);
// check from tsdb api
return true;
}