Merge pull request #26164 from taosdata/fix/TD-30627

adj create count window state
This commit is contained in:
Haojun Liao 2024-06-19 14:42:08 +08:00 committed by GitHub
commit 0e7f9a56c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 6 additions and 4 deletions

View File

@ -874,9 +874,8 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
void* p = NULL; void* p = NULL;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &p, pVLen);
int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file; code = code_file;
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
@ -885,7 +884,6 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
taosMemoryFree(p); taosMemoryFree(p);
} }
streamStateFreeCur(pCur);
goto _end; goto _end;
} else { } else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey); (*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);

View File

@ -336,6 +336,8 @@ sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2); sql create table ts4 using st tags(4,2,2);
sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt6 as select _wstart, count(*) c1 from st interval(10s); sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt6 as select _wstart, count(*) c1 from st interval(10s);
sleep 1000
sql insert into ts1 values(1648791213001,1,12,3,1.0); sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0); sql insert into ts2 values(1648791213001,1,12,3,1.0);
@ -354,6 +356,8 @@ sql insert into ts2 values(1648791233001,1,12,3,1.0);
sql resume stream streams6; sql resume stream streams6;
sleep 1000
sql insert into ts3 values(1648791243001,1,12,3,1.0); sql insert into ts3 values(1648791243001,1,12,3,1.0);
sql insert into ts4 values(1648791253001,1,12,3,1.0); sql insert into ts4 values(1648791253001,1,12,3,1.0);