diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d8d231e952..2f613f528c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -400,7 +400,7 @@ typedef struct SIntervalAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter - + SExprSupp scalarSupp; // supporter for perform scalar function SGroupResInfo groupResInfo; // multiple results build supporter SInterval interval; // interval info int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. @@ -730,7 +730,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream); + STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream); SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f352049810..6fe37feb4b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4216,8 +4216,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; bool isStream = (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type); - pOptr = - createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); + pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pIntervalPhyNode, + pTaskInfo, isStream); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7affac76d2..5bfb73271e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -448,7 +448,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) { STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; - qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); } // do prepare for the next round table scan operation tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); @@ -467,7 +467,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) { STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; - qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); } while (pTableScanInfo->scanTimes < total) { @@ -492,7 +492,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { GET_TASKID(pTaskInfo)); for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) { STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; - qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); } tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); pTableScanInfo->curTWinIdx = 0; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 48b0b1c071..2f4ae6c7c7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -956,6 +956,12 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { getTableScanInfo(pOperator, &pInfo->order, &scanFlag); + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup =& pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, + pExprSup->numOfExprs, NULL); + } + // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); @@ -1324,6 +1330,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { break; } + if (pInfo->scalarSupp.pExprInfo != NULL) { + SExprSupp* pExprSup = &pInfo->scalarSupp; + projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, + pExprSup->numOfExprs, NULL); + } + if (pBlock->info.type == STREAM_REPROCESS) { doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, NULL); @@ -1446,18 +1458,27 @@ void increaseTs(SqlFunctionCtx* pCtx) { SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream) { + STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->win = pTaskInfo->window; - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; + pInfo->win = pTaskInfo->window; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; - pInfo->twAggSup = *pTwAggSupp; + pInfo->twAggSup = *pTwAggSupp; + + if (pPhyNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pPhyNode->window.pExprs, NULL, &numOfScalar); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } pInfo->primaryTsIndex = primaryTsSlotId;