Merge pull request #24130 from taosdata/fix/3.0/TD-27945

fix: session window block row index overflow
This commit is contained in:
dapan1121 2023-12-20 16:13:47 +08:00 committed by GitHub
commit cd044137a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 3 deletions

View File

@ -1328,6 +1328,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
pRowSup->startRowIndex = 0;
// In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
@ -1339,9 +1340,6 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
((pRowSup->prevTs - tsList[j] >= 0) && (pRowSup->prevTs - tsList[j] <= gap))) {
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
doKeepTuple(pRowSup, tsList[j], gid);
if (j == 0 && pRowSup->startRowIndex != 0) {
pRowSup->startRowIndex = 0;
}
} else { // start a new session window
SResultRow* pResult = NULL;

View File

@ -186,7 +186,25 @@ class TDTestCase:
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1)
def test_crash_for_session_window(self):
tdSql.execute("drop database if exists test")
self.prepareTestEnv()
tdSql.execute("alter local 'queryPolicy' '3'")
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("flush database test", queryTimes=1)
time.sleep(2)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.query("select first(c2) from t0 session(ts, 1s) order by ts", queryTimes=1)
def run(self):
self.test_crash_for_session_window()
self.test_crash_for_state_window1()
self.test_crash_for_state_window2()
self.test_crash_for_state_window3()