diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index f4eb0dd87e..ba07e666a0 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -32,6 +32,7 @@ typedef struct SCountWindowResult { typedef struct SCountWindowSupp { SArray* pWinStates; int32_t stateIndex; + int32_t curStateIndex; } SCountWindowSupp; typedef struct SCountWindowOperatorInfo { @@ -45,6 +46,8 @@ typedef struct SCountWindowOperatorInfo { int32_t windowCount; int32_t windowSliding; SCountWindowSupp countSup; + SSDataBlock* pPreDataBlock; + int32_t preStateIndex; } SCountWindowOperatorInfo; void destroyCountWindowOperatorInfo(void* param) { @@ -65,6 +68,7 @@ static void clearWinStateBuff(SCountWindowResult* pBuff) { pBuff->winRows = 0; } static SCountWindowResult* getCountWinStateInfo(SCountWindowSupp* pCountSup) { SCountWindowResult* pBuffInfo = taosArrayGet(pCountSup->pWinStates, pCountSup->stateIndex); + pCountSup->curStateIndex = pCountSup->stateIndex; if (!pBuffInfo) { terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); @@ -113,6 +117,19 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno); TSKEY* tsCols = (TSKEY*)pColInfoData->pData; + int32_t numOfBuff = taosArrayGetSize(pInfo->countSup.pWinStates); + if (numOfBuff == 0) { + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + T_LONG_JMP(pTaskInfo->env, code); + } + pInfo->countSup.stateIndex = (pInfo->preStateIndex + 1) % numOfBuff; + + int32_t newSize = pRes->info.rows + pBlock->info.rows / pInfo->windowSliding + 1; + if (newSize > pRes->info.capacity) { + code = blockDataEnsureCapacity(pRes, newSize); + QUERY_CHECK_CODE(code, lino, _end); + } for (int32_t i = 0; i < pBlock->info.rows;) { SCountWindowResult* pBuffInfo = NULL; @@ -132,14 +149,6 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pInfo->pRow->win, 0); applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &pInfo->twAggSup.timeWindowData, i, num, pBlock->info.rows, pExprSup->numOfExprs); - if (pBuffInfo->winRows == pInfo->windowCount) { - doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); - copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes, - pExprSup->rowEntryInfoOffset, pTaskInfo); - pRes->info.rows += pInfo->pRow->numOfRows; - clearWinStateBuff(pBuffInfo); - clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); - } if (pInfo->windowCount != pInfo->windowSliding) { if (prevRows <= pInfo->windowSliding) { if (pBuffInfo->winRows > pInfo->windowSliding) { @@ -151,9 +160,21 @@ void doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { step = 0; } } + if (pBuffInfo->winRows == pInfo->windowCount) { + doUpdateNumOfRows(pExprSup->pCtx, pInfo->pRow, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); + copyResultrowToDataBlock(pExprSup->pExprInfo, pExprSup->numOfExprs, pInfo->pRow, pExprSup->pCtx, pRes, + pExprSup->rowEntryInfoOffset, pTaskInfo); + pRes->info.rows += pInfo->pRow->numOfRows; + clearWinStateBuff(pBuffInfo); + pInfo->preStateIndex = pInfo->countSup.curStateIndex; + clearResultRowInitFlag(pExprSup->pCtx, pExprSup->numOfExprs); + } i += step; } + code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); @@ -163,11 +184,18 @@ _end: } static void buildCountResult(SExprSupp* pExprSup, SCountWindowSupp* pCountSup, SExecTaskInfo* pTaskInfo, - SFilterInfo* pFilterInfo, SSDataBlock* pBlock) { + SFilterInfo* pFilterInfo, int32_t preStateIndex, SSDataBlock* pBlock) { SResultRow* pResultRow = NULL; int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; - for (int32_t i = 0; i < taosArrayGetSize(pCountSup->pWinStates); i++) { + int32_t numOfBuff = taosArrayGetSize(pCountSup->pWinStates); + int32_t newSize = pBlock->info.rows + numOfBuff; + if (newSize > pBlock->info.capacity) { + code = blockDataEnsureCapacity(pBlock, newSize); + QUERY_CHECK_CODE(code, lino, _end); + } + pCountSup->stateIndex = (preStateIndex + 1) % numOfBuff; + for (int32_t i = 0; i < numOfBuff; i++) { SCountWindowResult* pBuff = NULL; code = setCountWindowOutputBuff(pExprSup, pCountSup, &pResultRow, &pBuff); QUERY_CHECK_CODE(code, lino, _end); @@ -204,7 +232,14 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** blockDataCleanup(pRes); while (1) { - SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); + SSDataBlock* pBlock = NULL; + if (pInfo->pPreDataBlock == NULL) { + pBlock = getNextBlockFromDownstream(pOperator, 0); + } else { + pBlock = pInfo->pPreDataBlock; + pInfo->pPreDataBlock = NULL; + } + if (pBlock == NULL) { break; } @@ -226,18 +261,26 @@ static int32_t countWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** if (pInfo->groupId == 0) { pInfo->groupId = pBlock->info.id.groupId; } else if (pInfo->groupId != pBlock->info.id.groupId) { - buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes); + pInfo->pPreDataBlock = pBlock; + pRes->info.id.groupId = pInfo->groupId; + buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pInfo->preStateIndex, pRes); pInfo->groupId = pBlock->info.id.groupId; + if (pRes->info.rows > 0) { + (*ppRes) = pRes; + return code; + } } doCountWindowAggImpl(pOperator, pBlock); if (pRes->info.rows >= pOperator->resultInfo.threshold) { + pRes->info.id.groupId = pInfo->groupId; (*ppRes) = pRes; return code; } } - buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pRes); + pRes->info.id.groupId = pInfo->groupId; + buildCountResult(pExprSup, &pInfo->countSup, pTaskInfo, pOperator->exprSupp.pFilterInfo, pInfo->preStateIndex, pRes); _end: if (code != TSDB_CODE_SUCCESS) { @@ -320,6 +363,8 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy } pInfo->countSup.stateIndex = 0; + pInfo->pPreDataBlock = NULL; + pInfo->preStateIndex = 0; code = filterInitFromNode((SNode*)pCountWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index b80ea74006..7218291f8c 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -36,6 +36,7 @@ typedef struct SEventWindowOperatorInfo { SFilterInfo* pEndCondInfo; bool inWindow; SResultRow* pRow; + SSDataBlock* pPreDataBlock; } SEventWindowOperatorInfo; static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); @@ -126,6 +127,7 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy QUERY_CHECK_CODE(code, lino, _error); pInfo->tsSlotId = tsSlotId; + pInfo->pPreDataBlock = NULL; setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -199,7 +201,14 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); + SSDataBlock* pBlock = NULL; + if (pInfo->pPreDataBlock == NULL) { + pBlock = getNextBlockFromDownstream(pOperator, 0); + } else { + pBlock = pInfo->pPreDataBlock; + pInfo->pPreDataBlock = NULL; + } + if (pBlock == NULL) { break; } @@ -224,7 +233,8 @@ static int32_t eventWindowAggregateNext(SOperatorInfo* pOperator, SSDataBlock** code = doFilter(pRes, pSup->pFilterInfo, NULL); QUERY_CHECK_CODE(code, lino, _end); - if (pRes->info.rows >= pOperator->resultInfo.threshold) { + if (pRes->info.rows >= pOperator->resultInfo.threshold || + (pRes->info.id.groupId != pInfo->groupId && pRes->info.rows > 0)) { (*ppRes) = pRes; return code; } @@ -303,7 +313,10 @@ int32_t eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* p // this is a new group, reset the info pInfo->inWindow = false; pInfo->groupId = gid; + pInfo->pPreDataBlock = pBlock; + goto _return; } + pRes->info.id.groupId = pInfo->groupId; SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; diff --git a/tests/script/tsim/query/event.sim b/tests/script/tsim/query/event.sim index 16fe0f4a13..740cdb11dd 100644 --- a/tests/script/tsim/query/event.sim +++ b/tests/script/tsim/query/event.sim @@ -236,4 +236,54 @@ sql_error select count(*) from sta event_window start with f1 >0 end with f2 > 0 sql_error select count(*) from sta event_window start with f1 >0 end with f2 > 0 group by tbname; sql_error select count(*) from sta event_window start with f1 >0 end with f2 > 0 fill(NULL); +print step2 +print =============== create database +sql create database test2 vgroups 4; +sql use test2; +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sql insert into t1 values(1648791243003,4,2,3,3.1); +sql insert into t1 values(1648791253004,5,2,3,4.1); + +sql insert into t2 values(1648791213000,1,2,3,1.0); +sql insert into t2 values(1648791223001,2,2,3,1.1); +sql insert into t2 values(1648791233002,3,2,3,2.1); +sql insert into t2 values(1648791243003,4,2,3,3.1); +sql insert into t2 values(1648791253004,5,2,3,4.1); + +sql insert into t3 values(1648791213000,1,2,3,1.0); +sql insert into t3 values(1648791223001,2,2,3,1.1); +sql insert into t3 values(1648791233002,3,2,3,2.1); +sql insert into t3 values(1648791243003,4,2,3,3.1); +sql insert into t3 values(1648791253004,5,2,3,4.1); + +$loop_count = 0 +loop4: + +sleep 300 + +sql select _wstart, count(*) c1, tbname from st partition by tbname event_window start with a > 0 end with b = 2 slimit 2 limit 2; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop4 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/query_count1.sim b/tests/script/tsim/query/query_count1.sim index 043b604263..9711a09d97 100644 --- a/tests/script/tsim/query/query_count1.sim +++ b/tests/script/tsim/query/query_count1.sim @@ -101,5 +101,57 @@ sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_windo sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2); sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483647); + +print step3 +print =============== create database +sql create database test2 vgroups 4; +sql use test2; +sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791223001,2,2,3,1.1); +sql insert into t1 values(1648791233002,3,2,3,2.1); +sql insert into t1 values(1648791243003,4,2,3,3.1); +sql insert into t1 values(1648791253004,5,2,3,4.1); + +sql insert into t2 values(1648791213000,1,2,3,1.0); +sql insert into t2 values(1648791223001,2,2,3,1.1); +sql insert into t2 values(1648791233002,3,2,3,2.1); +sql insert into t2 values(1648791243003,4,2,3,3.1); +sql insert into t2 values(1648791253004,5,2,3,4.1); + +sql insert into t3 values(1648791213000,1,2,3,1.0); +sql insert into t3 values(1648791223001,2,2,3,1.1); +sql insert into t3 values(1648791233002,3,2,3,2.1); +sql insert into t3 values(1648791243003,4,2,3,3.1); +sql insert into t3 values(1648791253004,5,2,3,4.1); + +$loop_count = 0 +loop4: + +sleep 300 + +sql select _wstart, count(*) c1, tbname from st partition by tbname count_window(2) slimit 2 limit 2; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $rows != 4 then + print ======rows=$rows + goto loop4 +endi + + print query_count0 end system sh/exec.sh -n dnode1 -s stop -x SIGINT