From e7b1a7dc783b15ea4e6a8cbb8c141f9baf159e15 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Jan 2023 13:15:09 +0800 Subject: [PATCH] enh(query): support event window. --- .../libs/executor/src/eventwindowoperator.c | 28 ++++++++++++++++++- source/libs/executor/src/timewindowoperator.c | 4 +-- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index e38070953a..408d75998d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -41,6 +41,32 @@ static void destroyEWindowOperatorInfo(void* param); static void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); +// todo : move to util +static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, + uint64_t groupId) { + pRowSup->startRowIndex = rowIndex; + pRowSup->numOfRows = 0; + pRowSup->win.skey = tsList[rowIndex]; + pRowSup->groupId = groupId; +} + +static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) { + pRowSup->win.ekey = ts; + pRowSup->prevTs = ts; + pRowSup->numOfRows += 1; + pRowSup->groupId = groupId; +} + +static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) { + int64_t* ts = (int64_t*)pColData->pData; + int32_t delta = includeEndpoint ? 1 : 0; + + int64_t duration = pWin->ekey - pWin->skey + delta; + ts[2] = duration; // set the duration + ts[3] = pWin->skey; // window start key + ts[4] = pWin->ekey + delta; // window end key +} + SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo) { SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo)); @@ -264,7 +290,7 @@ SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator) { return NULL; } - SStateWindowOperatorInfo* pInfo = pOperator->info; + SEventWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SOptrBasicInfo* pBInfo = &pInfo->binfo; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6c17df9fb1..32e0c517b4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4174,8 +4174,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - pBlock->info.rows, pSup->numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, + currPos - startPos, pBlock->info.rows, pSup->numOfExprs); finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow));