diff --git a/source/libs/stream/src/tstreamUpdate.c b/source/libs/stream/src/tstreamUpdate.c index a207ec9265..850a17b6dd 100644 --- a/source/libs/stream/src/tstreamUpdate.c +++ b/source/libs/stream/src/tstreamUpdate.c @@ -54,15 +54,28 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) { if (precision != TSDB_TIME_PRECISION_MILLI) { val = convertTimePrecision(interval, precision, TSDB_TIME_PRECISION_MILLI); } - if (val < MIN_INTERVAL) { - val = MIN_INTERVAL; - } else if (val > MAX_INTERVAL) { + + if (val <= 0 || val > MAX_INTERVAL) { val = MAX_INTERVAL; + } else if (val < MIN_INTERVAL) { + val = MIN_INTERVAL; + } + + if (precision != TSDB_TIME_PRECISION_MILLI) { + val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision); } - val = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, precision); return val; } +static int64_t adjustWatermark(int64_t interval, int32_t watermark) { + if (watermark <= 0 || watermark > MAX_NUM_SCALABLE_BF * interval) { + watermark = MAX_NUM_SCALABLE_BF * interval; + } else if (watermark < MIN_NUM_SCALABLE_BF * interval) { + watermark = MIN_NUM_SCALABLE_BF * interval; + } + return watermark; +} + SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark) { return updateInfoInit(pInterval->interval, pInterval->precision, watermark); } @@ -76,14 +89,9 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pTsSBFs = NULL; pInfo->minTS = -1; pInfo->interval = adjustInterval(interval, precision); - pInfo->watermark = watermark; + pInfo->watermark = adjustWatermark(pInfo->interval, watermark); - uint64_t bfSize = (uint64_t)(watermark / pInfo->interval); - if (bfSize < MIN_NUM_SCALABLE_BF) { - bfSize = MIN_NUM_SCALABLE_BF; - } else if (bfSize > MAX_NUM_SCALABLE_BF) { - bfSize = MAX_NUM_SCALABLE_BF; - } + uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(SScalableBf)); if (pInfo->pTsSBFs == NULL) { diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index ed016ead44..32ed974f72 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -90,11 +90,21 @@ TEST(TD_STREAM_UPDATE_TEST, update) { } } + SUpdateInfo *pSU4 = updateInfoInit(-1, TSDB_TIME_PRECISION_MILLI, -1); + GTEST_ASSERT_EQ(pSU4->watermark, 120 * pSU4->interval); + GTEST_ASSERT_EQ(pSU4->interval, MILLISECOND_PER_MINUTE); + + SUpdateInfo *pSU5 = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); + GTEST_ASSERT_EQ(pSU5->watermark, 120 * pSU4->interval); + GTEST_ASSERT_EQ(pSU5->interval, MILLISECOND_PER_MINUTE); + updateInfoDestroy(pSU); updateInfoDestroy(pSU1); updateInfoDestroy(pSU2); updateInfoDestroy(pSU3); + updateInfoDestroy(pSU4); + updateInfoDestroy(pSU5); } int main(int argc, char* argv[]) {