From b4c2085ddcc06faff5dec0a3ac77f8af398966a5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 8 Jun 2023 16:32:35 +0800 Subject: [PATCH 1/8] feat: add pipeline processing for timeslice operator --- source/libs/executor/src/timesliceoperator.c | 112 +++++++++++++------ 1 file changed, 80 insertions(+), 32 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 3e4055876d..65e3e75bce 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -44,6 +44,7 @@ typedef struct STimeSliceOperatorInfo { uint64_t groupId; SGroupKeys* pPrevGroupKey; SSDataBlock* pNextGroupRes; + SSDataBlock* pRemainRes; // save block unfinished processing } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -641,6 +642,25 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) { return TSDB_CODE_SUCCESS; } +static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t threshold) { + SSDataBlock* pResBlock = pSliceInfo->pRes; + + if (pResBlock->info.rows > threshold) { + pSliceInfo->pRemainRes = pBlock; + return true; + } + + return false; +} + +static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) { + if (pSliceInfo->current > pSliceInfo->win.ekey) { + return true; + } + + return false; +} + static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, bool ignoreNull) { SSDataBlock* pResBlock = pSliceInfo->pRes; @@ -658,10 +678,12 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { continue; } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { + if (checkWindowBoundReached(pSliceInfo)) { break; } + if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + return; + } if (ts == pSliceInfo->current) { addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); @@ -671,9 +693,13 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { + + if (checkWindowBoundReached(pSliceInfo)) { break; } + if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + return; + } } else if (ts < pSliceInfo->current) { // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate doKeepPrevRows(pSliceInfo, pBlock, i); @@ -694,9 +720,12 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } - if (pSliceInfo->current > pSliceInfo->win.ekey) { + if (checkWindowBoundReached(pSliceInfo)) { break; } + if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + return; + } } else { // ignore current row, and do nothing } @@ -727,11 +756,19 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } doKeepPrevRows(pSliceInfo, pBlock, i); - if (pSliceInfo->current > pSliceInfo->win.ekey) { + if (checkWindowBoundReached(pSliceInfo)) { break; } + if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + return; + } } } + + // if reached here, meaning block processing finished naturally, + // or interpolation reach window upper bound + pSliceInfo->pRemainRes = NULL; + } static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) { @@ -778,34 +815,54 @@ static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { resetKeeperInfo(pSliceInfo); } +static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + STimeSliceOperatorInfo* pSliceInfo = pOperator->info; + SExprSupp* pSup = &pOperator->exprSupp; + bool ignoreNull = getIgoreNullRes(pSup); + int32_t order = TSDB_ORDER_ASC; + + int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + + if (pSliceInfo->scalarSup.pExprInfo != NULL) { + SExprSupp* pExprSup = &pSliceInfo->scalarSup; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); + doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull); + copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock); +} + static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STimeSliceOperatorInfo* pSliceInfo = pOperator->info; SSDataBlock* pResBlock = pSliceInfo->pRes; - SExprSupp* pSup = &pOperator->exprSupp; - bool ignoreNull = getIgoreNullRes(pSup); - int32_t order = TSDB_ORDER_ASC; - SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; - blockDataCleanup(pResBlock); while (1) { if (pSliceInfo->pNextGroupRes != NULL) { - setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true); - doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo, ignoreNull); - copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes); + doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes); + if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) { + doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); + goto _finished; + } pSliceInfo->pNextGroupRes = NULL; } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { setOperatorCompleted(pOperator); break; @@ -821,21 +878,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } - if (pSliceInfo->scalarSup.pExprInfo != NULL) { - SExprSupp* pExprSup = &pSliceInfo->scalarSup; - projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + doHandleTimeslice(pOperator, pBlock); + if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) { + doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); + goto _finished; } - - int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); - doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull); - copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock); } + // handling post work for a specific group // check if need to interpolate after last datablock // except for fill(next), fill(linear) @@ -848,11 +897,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // restore initial value for next group resetTimesliceInfo(pSliceInfo); - if (pResBlock->info.rows >= 4096) { - break; - } } +_finished: // restore the value setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); if (pResBlock->info.rows == 0) { @@ -908,6 +955,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->groupId = 0; pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; + pInfo->pRemainRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; From 7ee05df7c6d9f66efbc1e254213f698414f69b37 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 8 Jun 2023 17:46:25 +0800 Subject: [PATCH 2/8] fix lastBlock remain ts not saved --- source/libs/executor/src/timesliceoperator.c | 39 +++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 65e3e75bce..1aa6027a1d 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -45,6 +45,7 @@ typedef struct STimeSliceOperatorInfo { SGroupKeys* pPrevGroupKey; SSDataBlock* pNextGroupRes; SSDataBlock* pRemainRes; // save block unfinished processing + int64_t remainTs; // the remaining timestamp in the block to be processed } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -642,11 +643,9 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) { return TSDB_CODE_SUCCESS; } -static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t threshold) { +static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) { SSDataBlock* pResBlock = pSliceInfo->pRes; - if (pResBlock->info.rows > threshold) { - pSliceInfo->pRemainRes = pBlock; return true; } @@ -661,6 +660,16 @@ static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) { return false; } +static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) { + SSDataBlock* pResBlock = pSliceInfo->pRes; + + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + if (curIndex < pBlock->info.rows - 1) { + pSliceInfo->pRemainRes = pBlock; + pSliceInfo->remainTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); + } +} + static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, bool ignoreNull) { SSDataBlock* pResBlock = pSliceInfo->pRes; @@ -670,6 +679,12 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); + // check if need to resume from the position of last unfinished block + if (pSliceInfo->pRemainRes != NULL && ts < pSliceInfo->remainTs && + pSliceInfo->current <= pSliceInfo->remainTs) { + continue; + } + // check for duplicate timestamps if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); @@ -681,7 +696,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } @@ -697,7 +713,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } } else if (ts < pSliceInfo->current) { @@ -723,7 +740,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } } else { @@ -759,7 +777,8 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkWindowBoundReached(pSliceInfo)) { break; } - if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + saveBlockStatus(pSliceInfo, pBlock, i); return; } } @@ -854,7 +873,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { while (1) { if (pSliceInfo->pNextGroupRes != NULL) { doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes); - if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); goto _finished; } @@ -879,7 +898,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } doHandleTimeslice(pOperator, pBlock); - if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) { + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); goto _finished; } @@ -943,6 +962,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); initResultSizeInfo(&pOperator->resultInfo, 4096); + pOperator->resultInfo.threshold = 1; pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pLinearInfo = NULL; @@ -956,6 +976,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; + pInfo->remainTs = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; From 21fccc2d48224439ccd9ab37f0e97e088075e323 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 8 Jun 2023 18:25:32 +0800 Subject: [PATCH 3/8] fix switchin group issue --- source/libs/executor/src/timesliceoperator.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 1aa6027a1d..8006235391 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -903,7 +903,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { goto _finished; } } - // handling post work for a specific group + // post work for a specific group // check if need to interpolate after last datablock // except for fill(next), fill(linear) @@ -916,6 +916,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // restore initial value for next group resetTimesliceInfo(pSliceInfo); + if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + goto _finished; + } } _finished: From 145bb93967081a7364c3deff68c7dc95dae8ada4 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 8 Jun 2023 19:21:04 +0800 Subject: [PATCH 4/8] fix block only have one row --- source/libs/executor/src/timesliceoperator.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 8006235391..56aaf587c3 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -875,6 +875,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes); if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); + if (pSliceInfo->pRemainRes == NULL) { + pSliceInfo->pNextGroupRes = NULL; + } goto _finished; } pSliceInfo->pNextGroupRes = NULL; From bef62149047de1e249fdaba77b619daf5ef2fe4b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 8 Jun 2023 19:30:05 +0800 Subject: [PATCH 5/8] remove test code --- source/libs/executor/src/timesliceoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 56aaf587c3..416925a311 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -968,7 +968,6 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); initResultSizeInfo(&pOperator->resultInfo, 4096); - pOperator->resultInfo.threshold = 1; pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); pInfo->pLinearInfo = NULL; From e74c0ac987432bc7b5786df51bce768b70dcaedb Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 16 Jun 2023 14:11:19 +0800 Subject: [PATCH 6/8] save index instead of ts --- source/libs/executor/src/timesliceoperator.c | 29 ++++++++------------ 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 25cb94f7e1..415fefe75f 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -45,7 +45,7 @@ typedef struct STimeSliceOperatorInfo { SGroupKeys* pPrevGroupKey; SSDataBlock* pNextGroupRes; SSDataBlock* pRemainRes; // save block unfinished processing - int64_t remainTs; // the remaining timestamp in the block to be processed + int32_t remainIndex; // the remaining index in the block to be processed } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -669,8 +669,13 @@ static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBl SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); if (curIndex < pBlock->info.rows - 1) { pSliceInfo->pRemainRes = pBlock; - pSliceInfo->remainTs = *(int64_t*)colDataGetData(pTsCol, curIndex + 1); + pSliceInfo->remainIndex = curIndex + 1; + return; } + + // all data in remaining block processed + pSliceInfo->pRemainRes = NULL; + } static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, @@ -679,14 +684,10 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS SInterval* pInterval = &pSliceInfo->interval; SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - // check if need to resume from the position of last unfinished block - if (pSliceInfo->pRemainRes != NULL && ts < pSliceInfo->remainTs && - pSliceInfo->current <= pSliceInfo->remainTs) { - continue; - } + int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex; + for (; i < pBlock->info.rows; ++i) { + int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); // check for duplicate timestamps if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { @@ -696,13 +697,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { continue; } - if (checkWindowBoundReached(pSliceInfo)) { - break; - } - if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { - saveBlockStatus(pSliceInfo, pBlock, i); - return; - } if (ts == pSliceInfo->current) { addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); @@ -984,7 +978,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->pPrevGroupKey = NULL; pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; - pInfo->remainTs = 0; + pInfo->remainIndex = 0; + pOperator->resultInfo.threshold = 1; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; From e4c9a7474e6eee6d65d56633eedd60da1d76734d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 16 Jun 2023 14:58:23 +0800 Subject: [PATCH 7/8] remove test code --- source/libs/executor/src/timesliceoperator.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 415fefe75f..b0c08ee0b8 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -870,7 +870,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { while (1) { if (pSliceInfo->pNextGroupRes != NULL) { doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes); - if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); if (pSliceInfo->pRemainRes == NULL) { pSliceInfo->pNextGroupRes = NULL; @@ -898,7 +898,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } doHandleTimeslice(pOperator, pBlock); - if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { + if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); goto _finished; } @@ -979,7 +979,6 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->pNextGroupRes = NULL; pInfo->pRemainRes = NULL; pInfo->remainIndex = 0; - pOperator->resultInfo.threshold = 1; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; From 9cddf06053bc5c74a4e63d2f577c4d04a6c6736e Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 16 Jun 2023 16:11:56 +0800 Subject: [PATCH 8/8] handle 0 result case for groups --- source/libs/executor/src/timesliceoperator.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index b0c08ee0b8..5c01775a17 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -875,7 +875,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pSliceInfo->pRemainRes == NULL) { pSliceInfo->pNextGroupRes = NULL; } - goto _finished; + if (pResBlock->info.rows != 0) { + goto _finished; + } else { + // after fillter if result block has 0 rows, go back to + // process pNextGroupRes again for unfinished data + continue; + } } pSliceInfo->pNextGroupRes = NULL; } @@ -900,7 +906,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { doHandleTimeslice(pOperator, pBlock); if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL); - goto _finished; + if (pResBlock->info.rows != 0) { + goto _finished; + } } } // post work for a specific group @@ -916,8 +924,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // restore initial value for next group resetTimesliceInfo(pSliceInfo); - if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) { - goto _finished; + if (pResBlock->info.rows != 0) { + break; } }