diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 40de393570..9a1a30ac7b 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -155,7 +155,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b } pInfo->pTsBuckets = NULL; pInfo->pTsSBFs = NULL; - pInfo->minTS = -1; + pInfo->minTS = INT64_MIN; pInfo->interval = adjustInterval(interval, precision); pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark); pInfo->numSBFs = 0; @@ -181,7 +181,7 @@ int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, b QUERY_CHECK_CODE(code, lino, _end); } - TSKEY dumy = 0; + TSKEY dumy = INT64_MIN; for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) { void* tmp = taosArrayPush(pInfo->pTsBuckets, &dumy); if (!tmp) { @@ -231,11 +231,7 @@ _end: static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - if (ts < 0) { - code = TSDB_CODE_FAILED; - QUERY_CHECK_CODE(code, lino, _end); - } - if (pInfo->minTS < 0) { + if (pInfo->minTS == INT64_MIN) { pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval); } int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval); @@ -349,7 +345,7 @@ bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* p void** pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; TSKEY maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index); - if (ts < maxTs - pInfo->watermark) { + if (ts < maxTs - pInfo->watermark && maxTs != INT64_MIN) { // this window has been closed. if (pInfo->pCloseWinSBF) { code = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen, &res);