fix(stream): array init error
This commit is contained in:
parent
0a46e6ee3b
commit
fc9c22b89c
|
@ -17,21 +17,20 @@
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
|
||||||
#define DEFAULT_FALSE_POSITIVE 0.01
|
#define DEFAULT_FALSE_POSITIVE 0.01
|
||||||
#define DEFAULT_BUCKET_SIZE 1024
|
#define DEFAULT_BUCKET_SIZE 1024
|
||||||
#define ROWS_PER_MILLISECOND 1
|
#define ROWS_PER_MILLISECOND 1
|
||||||
#define MAX_NUM_SCALABLE_BF 120
|
#define MAX_NUM_SCALABLE_BF 120
|
||||||
#define MIN_NUM_SCALABLE_BF 10
|
#define MIN_NUM_SCALABLE_BF 10
|
||||||
#define DEFAULT_PREADD_BUCKET 1
|
#define DEFAULT_PREADD_BUCKET 1
|
||||||
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
#define MAX_INTERVAL MILLISECOND_PER_MINUTE
|
||||||
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
#define MIN_INTERVAL (MILLISECOND_PER_SECOND * 10)
|
||||||
|
|
||||||
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
static void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
||||||
if (pInfo->numSBFs < count ) {
|
if (pInfo->numSBFs < count) {
|
||||||
count = pInfo->numSBFs;
|
count = pInfo->numSBFs;
|
||||||
}
|
}
|
||||||
for (uint64_t i = 0; i < count; ++i) {
|
for (uint64_t i = 0; i < count; ++i) {
|
||||||
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND,
|
SScalableBf *tsSBF = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
|
||||||
DEFAULT_FALSE_POSITIVE);
|
|
||||||
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -76,7 +75,7 @@ static int64_t adjustWatermark(int64_t interval, int32_t watermark) {
|
||||||
return watermark;
|
return watermark;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark) {
|
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark) {
|
||||||
return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
|
return updateInfoInit(pInterval->interval, pInterval->precision, watermark);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,7 +92,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
||||||
|
|
||||||
uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
|
uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval);
|
||||||
|
|
||||||
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(SScalableBf));
|
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *));
|
||||||
if (pInfo->pTsSBFs == NULL) {
|
if (pInfo->pTsSBFs == NULL) {
|
||||||
updateInfoDestroy(pInfo);
|
updateInfoDestroy(pInfo);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -108,14 +107,14 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY dumy = 0;
|
TSKEY dumy = 0;
|
||||||
for(uint64_t i=0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
||||||
taosArrayPush(pInfo->pTsBuckets, &dumy);
|
taosArrayPush(pInfo->pTsBuckets, &dumy);
|
||||||
}
|
}
|
||||||
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
|
pInfo->numBuckets = DEFAULT_BUCKET_SIZE;
|
||||||
return pInfo;
|
return pInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
||||||
if (ts <= 0) {
|
if (ts <= 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -131,24 +130,23 @@ static SScalableBf* getSBf(SUpdateInfo *pInfo, TSKEY ts) {
|
||||||
}
|
}
|
||||||
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
|
SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index);
|
||||||
if (res == NULL) {
|
if (res == NULL) {
|
||||||
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND,
|
res = tScalableBfInit(pInfo->interval * ROWS_PER_MILLISECOND, DEFAULT_FALSE_POSITIVE);
|
||||||
DEFAULT_FALSE_POSITIVE);
|
|
||||||
taosArrayPush(pInfo->pTsSBFs, &res);
|
taosArrayPush(pInfo->pTsSBFs, &res);
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
|
bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
|
||||||
int32_t res = TSDB_CODE_FAILED;
|
int32_t res = TSDB_CODE_FAILED;
|
||||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||||
SScalableBf* pSBf = getSBf(pInfo, ts);
|
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||||
// pSBf may be a null pointer
|
// pSBf may be a null pointer
|
||||||
if (pSBf) {
|
if (pSBf) {
|
||||||
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY));
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||||
if (maxTs < ts ) {
|
if (maxTs < ts) {
|
||||||
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
taosArraySet(pInfo->pTsBuckets, index, &ts);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -159,7 +157,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, tb_uid_t tableId, TSKEY ts) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
//check from tsdb api
|
// check from tsdb api
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue