fix(stream):fix issue when ts less than 0
This commit is contained in:
parent
f6efd0a36c
commit
2c53c08726
|
@ -155,7 +155,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
|
||||||
}
|
}
|
||||||
pInfo->pTsBuckets = NULL;
|
pInfo->pTsBuckets = NULL;
|
||||||
pInfo->pTsSBFs = NULL;
|
pInfo->pTsSBFs = NULL;
|
||||||
pInfo->minTS = -1;
|
pInfo->minTS = INT64_MIN;
|
||||||
pInfo->interval = adjustInterval(interval, precision);
|
pInfo->interval = adjustInterval(interval, precision);
|
||||||
pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
|
pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark);
|
||||||
pInfo->numSBFs = 0;
|
pInfo->numSBFs = 0;
|
||||||
|
@ -181,7 +181,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY dumy = 0;
|
TSKEY dumy = INT64_MIN;
|
||||||
for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) {
|
||||||
void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy);
|
void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy);
|
||||||
if (!tmp) {
|
if (!tmp) {
|
||||||
|
@ -231,11 +231,7 @@ _end:
|
||||||
static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
|
static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
if (ts < 0) {
|
if (pInfo->minTS == INT64_MIN) {
|
||||||
code = TSDB_CODE_FAILED;
|
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
|
||||||
}
|
|
||||||
if (pInfo->minTS < 0) {
|
|
||||||
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
|
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
|
||||||
}
|
}
|
||||||
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
|
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
|
||||||
|
@ -349,7 +345,7 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p
|
||||||
void** pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
void** pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||||
TSKEY maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index);
|
TSKEY maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index);
|
||||||
if (ts < maxTs - pInfo->watermark) {
|
if (ts < maxTs - pInfo->watermark && maxTs != INT64_MIN) {
|
||||||
// this window has been closed.
|
// this window has been closed.
|
||||||
if (pInfo->pCloseWinSBF) {
|
if (pInfo->pCloseWinSBF) {
|
||||||
code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);
|
code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);
|
||||||
|
|
Loading…
Reference in New Issue