Merge pull request #24349 from taosdata/fix/TD-28172
fix stream state buff issue
This commit is contained in:
commit
d666897043
|
@ -2274,10 +2274,10 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
|
|||
|
||||
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
|
||||
if (!pBlock) {
|
||||
qDebug("%s===stream===%s: Block is Null", taskIdStr, flag);
|
||||
qDebug("%s===stream===%s %s: Block is Null", taskIdStr, flag, opStr);
|
||||
return;
|
||||
} else if (pBlock->info.rows == 0) {
|
||||
qDebug("%s===stream===%s: Block is Empty. type:%d", taskIdStr, flag, pBlock->info.type);
|
||||
qDebug("%s===stream===%s %s: Block is Empty. block type %d", taskIdStr, flag, opStr, pBlock->info.type);
|
||||
return;
|
||||
}
|
||||
if (qDebugFlag & DEBUG_DEBUG) {
|
||||
|
|
|
@ -557,7 +557,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
|||
void* pFileStore = getStateFileStore(pFileState);
|
||||
void* p = NULL;
|
||||
int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
|
||||
if (code_file == TSDB_CODE_SUCCESS) {
|
||||
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
|
||||
code = code_file;
|
||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
|
||||
|
|
|
@ -152,6 +152,49 @@ endi
|
|||
|
||||
print step 2 over
|
||||
|
||||
print step 3
|
||||
|
||||
sql create database test3 vgroups 1;
|
||||
sql use test3;
|
||||
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||
sql insert into t1 values(1648791213001,1,2,3,1.0);
|
||||
|
||||
print create stream streams3 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into streamt3 as select _wstart, max(a), count(*) c1 from t1 state_window(a);
|
||||
sql create stream streams3 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into streamt3 as select _wstart, max(a), count(*) c1 from t1 state_window(a);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791203000,2,2,3,1.0);
|
||||
|
||||
sleep 500
|
||||
|
||||
sql insert into t1 values(1648791214000,1,2,3,1.0);
|
||||
|
||||
$loop_count = 0
|
||||
loop6:
|
||||
|
||||
sleep 1000
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print select * from streamt3
|
||||
sql select * from streamt3;
|
||||
print $data00 $data01 $data02
|
||||
print $data10 $data11 $data12
|
||||
print $data20 $data21 $data22
|
||||
print $data30 $data31 $data32
|
||||
|
||||
if $rows != 2 then
|
||||
print =====rows=$rows
|
||||
goto loop6
|
||||
endi
|
||||
|
||||
print step 3 over
|
||||
|
||||
print state1 end
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
|
|
Loading…
Reference in New Issue