fix stream state operator issue

This commit is contained in:
54liuyao 2024-12-27 10:27:05 +08:00 committed by Haojun Liao
parent 98a3fa1ab5
commit e8f63cded9
2 changed files with 54 additions and 12 deletions

View File

@ -4466,6 +4466,19 @@ _end:
return code;
}
static bool isWinResult(SSessionKey* pKey, SSHashObj* pSeUpdate, SSHashObj* pResults) {
SSessionKey checkKey = {0};
getSessionHashKey(pKey, &checkKey);
if (tSimpleHashGet(pSeUpdate, &checkKey, sizeof(SSessionKey)) != NULL) {
return true;
}
if (tSimpleHashGet(pResults, &checkKey, sizeof(SSessionKey)) != NULL) {
return true;
}
return false;
}
static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4518,7 +4531,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
code = setStateOutputBuf(pAggSup, tsCols[i], groupId, pKeyData, &curWin, &nextWin);
QUERY_CHECK_CODE(code, lino, _end);
releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore);
if (isWinResult(&nextWin.winInfo.sessionWin, pSeUpdated, pAggSup->pResultRows) == false) {
releaseOutputBuf(pAggSup->pState, nextWin.winInfo.pStatePos, &pAPI->stateStore);
}
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
code = updateStateWindowInfo(pAggSup, &curWin, &nextWin, tsCols, groupId, pKeyColInfo, rows, i, &allEqual,

View File

@ -2,13 +2,28 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 135
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
system sh/cfg.sh -n dnode1 -c checkpointinterval -v 60
system sh/cfg.sh -n dnode1 -c snodeAddress -v 127.0.0.1:873
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
print step1 =============
print step1=============
print ================ create snode
sql show snodes
if $rows != 0 then
return -1
endi
sql create snode on dnode 1;
sql show snodes;
if $rows != 1 then
return -1
endi
print ============== snode created , create db
sql create database test3 vgroups 1;
sql use test3;
@ -57,7 +72,7 @@ loop8:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -87,7 +102,7 @@ loop9:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -127,7 +142,7 @@ loop10:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -162,7 +177,7 @@ loop11:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -194,7 +209,7 @@ loop11:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -239,7 +254,7 @@ loop12:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -315,7 +330,7 @@ loop13:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -369,7 +384,6 @@ if $data24 != 1 then
endi
print step4=============
sql create database test6 vgroups 4;
sql use test6;
sql create stable st(ts timestamp,a int,b int,c int,d int) tags(ta int,tb int,tc int);
@ -396,7 +410,7 @@ loop14:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
@ -449,4 +463,17 @@ if $data25 != 2 then
goto loop14
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
print sleep for 1min for checkpoint generate
sleep 60000
print ================== restart to load checkpoint from snode
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sleep 500
sql connect
sleep 30000
sql select start_ver, checkpoint_ver from information_schema.ins_stream_tasks;