From d1eb14dccc711e2bb32117fdd2cde0d84f5ac764 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 26 Aug 2022 18:47:09 +0800 Subject: [PATCH] fix: interval operator _wstart error and interval operator close window for interpolation on different group id --- source/libs/executor/src/timewindowoperator.c | 59 ++++++++++++------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index cadaf4a9d5..7a66275834 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -33,11 +33,16 @@ typedef struct SPullWindowInfo { uint64_t groupId; } SPullWindowInfo; +typedef struct SOpenWindowInfo { + SResultRowPosition pos; + uint64_t groupId; +} SOpenWindowInfo; + static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator); static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo); -static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult); +static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); ///* @@ -598,14 +603,14 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num int32_t startPos = 0; int32_t numOfOutput = pSup->numOfExprs; - uint64_t groupId = pBlock->info.groupId; SResultRow* pResult = NULL; while (1) { SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); - - SResultRowPosition* p1 = (SResultRowPosition*)pn->data; + SOpenWindowInfo* pOpenWin = (SOpenWindowInfo *)pn->data; + uint64_t groupId = pOpenWin->groupId; + SResultRowPosition* p1 = &pOpenWin->pos; if (p->pageId == p1->pageId && p->offset == p1->offset) { break; } @@ -631,12 +636,15 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num SGroupKeys* pTsKey = taosArrayGet(pInfo->pPrevValues, 0); int64_t prevTs = *(int64_t*)pTsKey->pData; - doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, - RESULT_ROW_END_INTERP, pSup); + if (groupId == pBlock->info.groupId) { + doTimeWindowInterpolation(pInfo->pPrevValues, pBlock->pDataBlock, prevTs, -1, tsCols[startPos], startPos, w.ekey, + RESULT_ROW_END_INTERP, pSup); + } setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); @@ -965,7 +973,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // prev time window not interpolation yet. if (pInfo->timeWindowInterpo) { - SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); // restore current time window @@ -1017,9 +1025,13 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); - - // window start(end) key interpolation - doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + if ((ascScan && ekey <= pBlock->info.window.ekey) || + (!ascScan && ekey >= pBlock->info.window.skey)) { + // window start(end) key interpolation + doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); + } else { + addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); + } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, @@ -1040,20 +1052,23 @@ void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInf } } -SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) { - SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; +SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult, uint64_t groupId) { + SOpenWindowInfo openWin = {0}; + openWin.pos.pageId = pResult->pageId; + openWin.pos.offset = pResult->offset; + openWin.groupId = groupId; SListNode* pn = tdListGetTail(pResultRowInfo->openWindow); if (pn == NULL) { - tdListAppend(pResultRowInfo->openWindow, &pos); - return pos; + tdListAppend(pResultRowInfo->openWindow, &openWin); + return openWin.pos; } - SResultRowPosition* px = (SResultRowPosition*)pn->data; - if (px->pageId != pos.pageId || px->offset != pos.offset) { - tdListAppend(pResultRowInfo->openWindow, &pos); + SOpenWindowInfo * px = (SOpenWindowInfo *)pn->data; + if (px->pos.pageId != openWin.pos.pageId || px->pos.offset != openWin.pos.offset || px->groupId != openWin.groupId) { + tdListAppend(pResultRowInfo->openWindow, &openWin); } - return pos; + return openWin.pos; } int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) { @@ -1884,7 +1899,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, pInfo); if (pInfo->timeWindowInterpo) { - pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (pInfo->binfo.resultRowInfo.openWindow == NULL) { goto _error; } @@ -5157,7 +5172,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo); if (iaInfo->timeWindowInterpo) { - iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); } initResultRowInfo(&iaInfo->binfo.resultRowInfo); @@ -5294,7 +5309,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* // prev time window not interpolation yet. if (iaInfo->timeWindowInterpo) { - SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); // restore current time window @@ -5462,7 +5477,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo); if (iaInfo->timeWindowInterpo) { - iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SOpenWindowInfo)); if (iaInfo->binfo.resultRowInfo.openWindow == NULL) { goto _error; }