feat:opt final window
This commit is contained in:
parent
34ff1322e4
commit
4c2d0b5f63
|
@ -1111,7 +1111,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type");
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1897,6 +1897,7 @@ FETCH_NEXT_BLOCK:
|
||||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
|
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
|
||||||
copyDataBlock(pInfo->pUpdateRes, pBlock);
|
copyDataBlock(pInfo->pUpdateRes, pBlock);
|
||||||
|
pInfo->updateResIndex = 0;
|
||||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
|
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
|
||||||
} break;
|
} break;
|
||||||
|
|
|
@ -1364,8 +1364,13 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
uint64_t winGpId = pGpDatas[i];
|
uint64_t winGpId = pGpDatas[i];
|
||||||
bool res = doDeleteWindow(pOperator, win.skey, winGpId);
|
|
||||||
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
|
SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
|
||||||
|
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
|
||||||
|
if (chIds) {
|
||||||
|
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
bool res = doDeleteWindow(pOperator, win.skey, winGpId);
|
||||||
if (pUpWins && res) {
|
if (pUpWins && res) {
|
||||||
taosArrayPush(pUpWins, &winRes);
|
taosArrayPush(pUpWins, &winRes);
|
||||||
}
|
}
|
||||||
|
@ -2615,6 +2620,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
streamStateCommit(pInfo->pState);
|
streamStateCommit(pInfo->pState);
|
||||||
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
|
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
|
||||||
}
|
}
|
||||||
|
qDebug("===stream===interval final close");
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -6,6 +6,32 @@ system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 50
|
sleep 50
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
|
sql create dnode $hostname2 port 7200
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
|
||||||
|
print ===== step1
|
||||||
|
$x = 0
|
||||||
|
step1:
|
||||||
|
$x = $x + 1
|
||||||
|
sleep 1000
|
||||||
|
if $x == 10 then
|
||||||
|
print ====> dnode not ready!
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql select * from information_schema.ins_dnodes
|
||||||
|
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||||
|
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data(1)[4] != ready then
|
||||||
|
goto step1
|
||||||
|
endi
|
||||||
|
if $data(2)[4] != ready then
|
||||||
|
goto step1
|
||||||
|
endi
|
||||||
|
|
||||||
print ===== step2
|
print ===== step2
|
||||||
sql drop stream if exists stream_t1;
|
sql drop stream if exists stream_t1;
|
||||||
sql drop database if exists test;
|
sql drop database if exists test;
|
||||||
|
@ -458,4 +484,122 @@ if $data12 != 2 then
|
||||||
goto loop3
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ===== step3
|
||||||
|
|
||||||
|
sql drop database if exists test4;
|
||||||
|
sql create database test4 vgroups 10;
|
||||||
|
sql use test4;
|
||||||
|
sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int);
|
||||||
|
sql create table aaa using st tags(1,1,1);
|
||||||
|
sql create table bbb using st tags(2,2,2);
|
||||||
|
sql create table ccc using st tags(3,2,2);
|
||||||
|
sql create table ddd using st tags(4,2,2);
|
||||||
|
|
||||||
|
|
||||||
|
sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1, last_row(b) c2 from st partition by c interval(1s) ;
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa");
|
||||||
|
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa");
|
||||||
|
|
||||||
|
sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa");
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop4:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql delete from aaa where ts = 1648791223003 ;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop5:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
sql delete from ccc;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop6:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop6
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql delete from ddd;
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
|
||||||
|
loop7:
|
||||||
|
sleep 200
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 20 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamst;
|
||||||
|
|
||||||
|
if $rows == 0 then
|
||||||
|
goto loop7
|
||||||
|
endi
|
||||||
|
|
||||||
|
print ===== over
|
||||||
|
|
||||||
system sh/stop_dnodes.sh
|
system sh/stop_dnodes.sh
|
||||||
|
|
Loading…
Reference in New Issue