From 169a6703d171ef6046e34135edeef0d6fce88ddb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 17:51:50 +0800 Subject: [PATCH] enh(query): support event window. --- .../libs/executor/src/eventwindowoperator.c | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 61c8030bd9..1480ac0451 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -172,7 +172,9 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; int32_t order = TSDB_ORDER_ASC; - blockDataCleanup(pInfo->binfo.pRes); + SSDataBlock* pRes = pInfo->binfo.pRes; + + blockDataCleanup(pRes); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -194,9 +196,12 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { } eventWindowAggImpl(pOperator, pInfo, pBlock); + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + return pRes; + } } - return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; + return pRes->info.rows == 0 ? NULL : pRes; } static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, @@ -236,6 +241,7 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pInfo->binfo.pRes; int64_t gid = pBlock->info.id.groupId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); @@ -272,13 +278,18 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf if (rowIndex < pBlock->info.rows) { doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); - // todo check if pblock buffer is enough doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); - copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pInfo->binfo.pRes, + // check buffer size + if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) { + int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows; + blockDataEnsureCapacity(pRes, newSize); + } + + copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes, pSup->rowEntryInfoOffset, pTaskInfo); - pInfo->binfo.pRes->info.rows += pInfo->pRow->numOfRows; + pRes->info.rows += pInfo->pRow->numOfRows; pInfo->inWindow = false; rowIndex += 1;