From fc6ca69afad969d883e707ea1315851e6d64d692 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 26 Apr 2023 11:04:13 +0800 Subject: [PATCH] refactor timeslice operator --- source/libs/executor/src/timesliceoperator.c | 65 +++++++++++++------- 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index e1782107f4..7024b092b0 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -551,6 +551,11 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS } } +static void revertTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) { + pSliceInfo->current = pSliceInfo->win.skey; + pSliceInfo->prevTsSet = false; +} + static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -569,34 +574,52 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { blockDataCleanup(pResBlock); while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - break; + if (pSliceInfo->pNextGroupRes != NULL) { + doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo); + pSliceInfo->pNextGroupRes = NULL; } - if (pSliceInfo->scalarSup.pExprInfo != NULL) { - SExprSupp* pExprSup = &pSliceInfo->scalarSup; - projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + + if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) { + pSliceInfo->groupId = pBlock->info.id.groupId; + } else { + if (pSliceInfo->groupId != pBlock->info.id.groupId) { + pSliceInfo->groupId = pBlock->info.id.groupId; + pSliceInfo->pNextGroupRes = pBlock; + break; + } + } + + if (pSliceInfo->scalarSup.pExprInfo != NULL) { + SExprSupp* pExprSup = &pSliceInfo->scalarSup; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL); + } + + int32_t code = initKeeperInfo(pSliceInfo, pBlock); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); + doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); } - int32_t code = initKeeperInfo(pSliceInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + // check if need to interpolate after last datablock + // except for fill(next), fill(linear) + while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && + pSliceInfo->fillType != TSDB_FILL_LINEAR) { + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); } - - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); - doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); } - // check if need to interpolate after last datablock - // except for fill(next), fill(linear) - while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && - pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);