fix(stream): stream state invalid cur
This commit is contained in:
parent
1e42bf61b0
commit
5cee271cbc
|
@ -650,10 +650,10 @@ int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, S
|
||||||
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
|
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
|
||||||
res = 0;
|
res = 0;
|
||||||
resKey = tmpKey;
|
resKey = tmpKey;
|
||||||
|
streamStateCurPrev(pState, pCur);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
streamStateCurPrev(pState, pCur);
|
|
||||||
}
|
}
|
||||||
*curKey = resKey;
|
*curKey = resKey;
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
|
@ -700,9 +700,14 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
|
||||||
memcpy(tmp, *pVal, valSize);
|
memcpy(tmp, *pVal, valSize);
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
streamStateCurNext(pState, pCur);
|
streamStateCurNext(pState, pCur);
|
||||||
|
} else {
|
||||||
|
*key = tmpKey;
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
|
pCur = streamStateSessionSeekKeyNext(pState, key);
|
||||||
|
}
|
||||||
|
|
||||||
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
|
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
|
||||||
|
|
|
@ -449,12 +449,12 @@ if $data26 != 14 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql create database test1 vgroups 1
|
sql create database test1 vgroups 1;
|
||||||
sql select * from information_schema.ins_databases
|
sql select * from information_schema.ins_databases;
|
||||||
|
|
||||||
print $data00 $data01 $data02
|
print $data00 $data01 $data02
|
||||||
|
|
||||||
sql use test1
|
sql use test1;
|
||||||
|
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
|
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
|
||||||
sql create stream streams2 trigger at_once into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
sql create stream streams2 trigger at_once into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
||||||
|
@ -498,7 +498,50 @@ if $data15 != 3 then
|
||||||
goto loop5
|
goto loop5
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql drop database test;
|
sql create database test3 vgroups 1;
|
||||||
sql drop database test1;
|
sql use test3;
|
||||||
|
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
|
||||||
|
sql create stream streams3 trigger at_once into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
||||||
|
sql insert into t1 values(1648791212000,1,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791213000,2,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791214000,3,2,4,1.0,2);
|
||||||
|
sql insert into t1 values(1648791215000,4,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791211000,5,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791210000,6,2,4,1.0,2);
|
||||||
|
sql insert into t1 values(1648791217000,7,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791219000,8,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791209000,9,2,4,1.0,2);
|
||||||
|
sql insert into t1 values(1648791220000,10,2,4,1.0,2);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791212000,1,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791213000,2,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791214000,3,2,4,1.0,2);
|
||||||
|
sql insert into t1 values(1648791215000,4,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791211000,5,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791210000,6,2,4,1.0,2);
|
||||||
|
sql insert into t1 values(1648791217000,7,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791219000,8,2,3,1.0,1);
|
||||||
|
sql insert into t1 values(1648791209000,9,2,4,1.0,2);
|
||||||
|
sql insert into t1 values(1648791220000,10,2,4,1.0,2);
|
||||||
|
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop6:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
sql select * from streamt3;
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $rows != 10 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue