fix: session window block row index overflow

This commit is contained in:
wangjiaming0909 2023-12-19 14:31:13 +08:00
parent ecab6655a6
commit ad24c03538
2 changed files with 19 additions and 3 deletions

View File

@ -1328,6 +1328,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
pRowSup->startRowIndex = 0;
// In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
@ -1339,9 +1340,6 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid);
if (j == 0 && pRowSup->startRowIndex != 0) {
pRowSup->startRowIndex = 0;
}
} else { // start a new session window
SResultRow* pResult = NULL;

View File

@ -186,7 +186,25 @@ class TDTestCase:
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1)
def test_crash_for_session_window(self):
tdSql.execute("drop database if exists test")
self.prepareTestEnv()
tdSql.execute("alter local 'queryPolicy' '3'")
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("flush database test", queryTimes=1)
time.sleep(2)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.query("select first(c2) from t0 session(ts, 1s) order by ts", queryTimes=1)
def run(self):
self.test_crash_for_session_window()
self.test_crash_for_state_window1()
self.test_crash_for_state_window2()
self.test_crash_for_state_window3()