From ad24c035381ee564b94b1de6cbfcf6b643ae10ef Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 19 Dec 2023 14:31:13 +0800 Subject: [PATCH] fix: session window block row index overflow --- source/libs/executor/src/timewindowoperator.c | 4 +--- tests/system-test/2-query/state_window.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0ca91f74ad..afe1921d30 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1328,6 +1328,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator SWindowRowsSup* pRowSup = &pInfo->winSup; pRowSup->numOfRows = 0; + pRowSup->startRowIndex = 0; // In case of ascending or descending order scan data, only one time window needs to be kepted for each table. TSKEY* tsList = (TSKEY*)pColInfoData->pData; @@ -1339,9 +1340,6 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) { // The gap is less than the threshold, so it belongs to current session window that has been opened already. doKeepTuple(pRowSup, tsList[j], gid); - if (j == 0 && pRowSup->startRowIndex != 0) { - pRowSup->startRowIndex = 0; - } } else { // start a new session window SResultRow* pResult = NULL; diff --git a/tests/system-test/2-query/state_window.py b/tests/system-test/2-query/state_window.py index a211cd9dbe..2bf363f0dc 100644 --- a/tests/system-test/2-query/state_window.py +++ b/tests/system-test/2-query/state_window.py @@ -186,7 +186,25 @@ class TDTestCase: tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1) tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1) + def test_crash_for_session_window(self): + tdSql.execute("drop database if exists test") + self.prepareTestEnv() + tdSql.execute("alter local 'queryPolicy' '3'") + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.execute("flush database test", queryTimes=1) + time.sleep(2) + tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1) + tdSql.query("select first(c2) from t0 session(ts, 1s) order by ts", queryTimes=1) + def run(self): + self.test_crash_for_session_window() self.test_crash_for_state_window1() self.test_crash_for_state_window2() self.test_crash_for_state_window3()