count window support having
This commit is contained in:
parent
789746f9e2
commit
d3dd15e621
|
@ -129,7 +129,7 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock) {
|
static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo, SFilterInfo* pFilterInfo, SSDataBlock* pBlock) {
|
||||||
SResultRow* pResultRow = NULL;
|
SResultRow* pResultRow = NULL;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) {
|
||||||
SCountWindowResult* pBuff = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow);
|
SCountWindowResult* pBuff = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow);
|
||||||
|
@ -143,6 +143,7 @@ static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, S
|
||||||
clearWinStateBuff(pBuff);
|
clearWinStateBuff(pBuff);
|
||||||
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
|
clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs);
|
||||||
}
|
}
|
||||||
|
doFilter(pBlock, pFilterInfo, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
|
static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
|
||||||
|
@ -177,7 +178,7 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->groupId == 0) {
|
if (pInfo->groupId == 0) {
|
||||||
pInfo->groupId = pBlock->info.id.groupId;
|
pInfo->groupId = pBlock->info.id.groupId;
|
||||||
} else if (pInfo->groupId != pBlock->info.id.groupId) {
|
} else if (pInfo->groupId != pBlock->info.id.groupId) {
|
||||||
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes);
|
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes);
|
||||||
pInfo->groupId = pBlock->info.id.groupId;
|
pInfo->groupId = pBlock->info.id.groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,7 +188,7 @@ static SSDataBlock* countWindowAggregate(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pRes);
|
buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes);
|
||||||
return pRes->info.rows == 0 ? NULL : pRes;
|
return pRes->info.rows == 0 ? NULL : pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,6 +247,11 @@ SOperatorInfo* createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
|
||||||
|
|
||||||
pInfo->countSup.stateIndex = 0;
|
pInfo->countSup.stateIndex = 0;
|
||||||
|
|
||||||
|
code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,
|
setOperatorInfo(pOperator, "CountWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, true, OP_NOT_OPENED, pInfo,
|
||||||
|
|
|
@ -175,5 +175,51 @@ if $data33 != 3 then
|
||||||
goto loop3
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step3
|
||||||
|
print =============== create database
|
||||||
|
sql create database test3 vgroups 1;
|
||||||
|
sql use test3;
|
||||||
|
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
sql insert into t1 values(1648791213000,0,1,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213001,2,2,3,1.1);
|
||||||
|
sql insert into t1 values(1648791223002,0,3,3,2.1);
|
||||||
|
sql insert into t1 values(1648791223003,1,4,3,3.1);
|
||||||
|
sql insert into t1 values(1648791223004,1,5,3,4.1);
|
||||||
|
sql insert into t1 values(1648791223005,2,6,3,4.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
print 1 sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 3;
|
||||||
|
sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 3;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 1 then
|
||||||
|
print ======rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
print 1 sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 6;
|
||||||
|
sql select _wstart, count(*),max(b) from t1 count_window(3) having max(b) > 6;
|
||||||
|
|
||||||
|
print $data00 $data01 $data02 $data03
|
||||||
|
print $data10 $data11 $data12 $data13
|
||||||
|
|
||||||
|
# row 0
|
||||||
|
if $rows != 0 then
|
||||||
|
print ======rows=$rows
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
print query_count0 end
|
print query_count0 end
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue