From 00c64ed0e367ee45df6bc04e8b996e2b4afc7b19 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 17 Jun 2024 19:33:01 +0800 Subject: [PATCH 1/2] adj create count window state --- source/libs/stream/src/streamSessionState.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 005fd1603c..84db657392 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -874,9 +874,8 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey void* pFileStore = getStateFileStore(pFileState); void* p = NULL; - SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); - int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); - if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { + int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &p, pVLen); + if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); @@ -885,7 +884,6 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey code = TSDB_CODE_FAILED; taosMemoryFree(p); } - streamStateFreeCur(pCur); goto _end; } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); From f5bb4df8467e92a513f48fdec98fec82c2e8fecc Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 18 Jun 2024 11:30:24 +0800 Subject: [PATCH 2/2] adj test case --- tests/script/tsim/stream/pauseAndResume.sim | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 9a05e645c2..b9d6e141be 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -336,6 +336,8 @@ sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt6 as select _wstart, count(*) c1 from st interval(10s); +sleep 1000 + sql insert into ts1 values(1648791213001,1,12,3,1.0); sql insert into ts2 values(1648791213001,1,12,3,1.0); @@ -354,6 +356,8 @@ sql insert into ts2 values(1648791233001,1,12,3,1.0); sql resume stream streams6; +sleep 1000 + sql insert into ts3 values(1648791243001,1,12,3,1.0); sql insert into ts4 values(1648791253001,1,12,3,1.0);