From 0b9e3c9871d530fb9b19720284ed879f457a02c3 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 5 Jan 2024 16:20:55 +0800 Subject: [PATCH] fix stream state buff issue --- source/libs/executor/src/executil.c | 4 +- source/libs/stream/src/streamSessionState.c | 2 +- tests/script/tsim/stream/state1.sim | 43 +++++++++++++++++++++ 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c1ddd6e4f4..b713b9b112 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2274,10 +2274,10 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) { if (!pBlock) { - qDebug("%s===stream===%s: Block is Null", taskIdStr, flag); + qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr); return; } else if (pBlock->info.rows == 0) { - qDebug("%s===stream===%s: Block is Empty. type:%d", taskIdStr, flag, pBlock->info.type); + qDebug("%s===stream===%s %s: Block is Empty. block type %d", taskIdStr, flag, opStr, pBlock->info.type); return; } if (qDebugFlag & DEBUG_DEBUG) { diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 765403b1aa..5f19b47e1f 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -557,7 +557,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch void* pFileStore = getStateFileStore(pFileState); void* p = NULL; int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); - if (code_file == TSDB_CODE_SUCCESS) { + if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); diff --git a/tests/script/tsim/stream/state1.sim b/tests/script/tsim/stream/state1.sim index 775528bd4e..e6f7a81851 100644 --- a/tests/script/tsim/stream/state1.sim +++ b/tests/script/tsim/stream/state1.sim @@ -152,6 +152,49 @@ endi print step 2 over +print step 3 + +sql create database test3 vgroups 1; +sql use test3; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); + +print create stream streams3 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into streamt3 as select _wstart, max(a), count(*) c1 from t1 state_window(a); +sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into streamt3 as select _wstart, max(a), count(*) c1 from t1 state_window(a); + +sleep 1000 + +sql insert into t1 values(1648791203000,2,2,3,1.0); + +sleep 500 + +sql insert into t1 values(1648791214000,1,2,3,1.0); + +$loop_count = 0 +loop6: + +sleep 1000 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print select * from streamt3 +sql select * from streamt3; +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 + +if $rows != 2 then + print =====rows=$rows + goto loop6 +endi + +print step 3 over + print state1 end system sh/stop_dnodes.sh