diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 60efefb1e4..d957dd987b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -800,6 +800,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo); // clang-format on int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, @@ -853,6 +855,10 @@ int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); +void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); +void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) ; + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 408d75998d..61c8030bd9 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -20,7 +20,6 @@ #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" -#include "tfill.h" #include "ttime.h" typedef struct SEventWindowOperatorInfo { @@ -29,16 +28,20 @@ typedef struct SEventWindowOperatorInfo { SExprSupp scalarSup; SGroupResInfo groupResInfo; SWindowRowsSup winSup; - SColumn stateCol; // start row index bool hasKey; SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; + + SFilterInfo* pStartCondInfo; + SFilterInfo* pEndCondInfo; + bool inWindow; + SResultRow* pRow; } SEventWindowOperatorInfo; -static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator); -static void destroyEWindowOperatorInfo(void* param); -static void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); +static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator); +static void destroyEWindowOperatorInfo(void* param); +static void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock); static SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator); // todo : move to util @@ -67,7 +70,7 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, b ts[4] = pWin->ekey + delta; // window end key } -SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWinodwPhysiNode* pStateNode, +SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo) { SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -75,27 +78,29 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi goto _error; } - int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; - SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStartCond)->pExpr; + SEventWinodwPhysiNode* pEventWindowNode = (SEventWinodwPhysiNode*)physiNode; - if (pStateNode->window.pExprs != NULL) { + int32_t tsSlotId = ((SColumnNode*)pEventWindowNode->window.pTspk)->slotId; + int32_t code = filterInitFromNode((SNode*)pEventWindowNode->pStartCond, &pInfo->pStartCondInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = filterInitFromNode((SNode*)pEventWindowNode->pEndCond, &pInfo->pEndCondInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pEventWindowNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); - int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); + SExprInfo* pScalarExprInfo = createExprInfo(pEventWindowNode->window.pExprs, NULL, &numOfScalarExpr); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; } } - pInfo->stateCol = extractColumnFromColumnNode(pColNode); - pInfo->stateKey.type = pInfo->stateCol.type; - pInfo->stateKey.bytes = pInfo->stateCol.bytes; - pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); - if (pInfo->stateKey.pData == NULL) { - goto _error; - } - - int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + code = filterInitFromNode((SNode*)pEventWindowNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -103,7 +108,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SExprInfo* pExprInfo = createExprInfo(pEventWindowNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); @@ -111,20 +116,22 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SEventWi goto _error; } - SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pEventWindowNode->window.node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); + initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->twAggSup = - (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; + pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pEventWindowNode->window.watermark, + .calTrigger = pEventWindowNode->window.triggerType}; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->tsSlotId = tsSlotId; - setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, + setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(openEventWindowAggOptr, doEventWindowAgg, NULL, destroyEWindowOperatorInfo, + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, &downstream, 1); @@ -158,17 +165,14 @@ void destroyEWindowOperatorInfo(void* param) { taosMemoryFreeClear(param); } -static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) { - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } - +static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { SEventWindowOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExprSupp* pSup = &pOperator->exprSupp; int32_t order = TSDB_ORDER_ASC; - int64_t st = taosGetTimestampUs(); + + blockDataCleanup(pInfo->binfo.pRes); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -189,133 +193,113 @@ static int32_t openEventWindowAggOptr(SOperatorInfo* pOperator) { } } - doEventWindowAggImpl(pOperator, pInfo, pBlock); + eventWindowAggImpl(pOperator, pInfo, pBlock); } - pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); - pOperator->status = OP_RES_TO_RETURN; + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; +} +static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, + SExprSupp* pExprSup, SAggSupporter* pAggSup) { + if (*pResult == NULL) { + SResultRow* p = taosMemoryCalloc(1, pAggSup->resultRowSize); + pResultRowInfo->cur = (SResultRowPosition){.pageId = p->pageId, .offset = p->offset}; + *pResult = p; + } + + (*pResult)->win = *win; + setResultRowInitCtx(*pResult, pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } -void doEventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; - - SColumnInfoData* pStateColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->stateCol.slotId); - int64_t gid = pBlock->info.id.groupId; - - bool masterScan = true; - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - int16_t bytes = pStateColInfoData->info.bytes; - - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); - TSKEY* tsList = (TSKEY*)pColInfoData->pData; - +static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSup, int32_t startIndex, int32_t endIndex, + const SSDataBlock* pBlock, int64_t* tsList, SExecTaskInfo* pTaskInfo) { SWindowRowsSup* pRowSup = &pInfo->winSup; - pRowSup->numOfRows = 0; - struct SColumnDataAgg* pAgg = NULL; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - pAgg = (pBlock->pBlockAgg != NULL) ? pBlock->pBlockAgg[pInfo->stateCol.slotId] : NULL; - if (colDataIsNull(pStateColInfoData, pBlock->info.rows, j, pAgg)) { - continue; - } + int32_t numOfOutput = pSup->numOfExprs; + int32_t numOfRows = endIndex - startIndex + 1; - char* val = colDataGetData(pStateColInfoData, j); + doKeepTuple(pRowSup, tsList[endIndex], pBlock->info.id.groupId); - if (gid != pRowSup->groupId || !pInfo->hasKey) { - // todo extract method - if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { - varDataCopy(pInfo->stateKey.pData, val); - } else { - memcpy(pInfo->stateKey.pData, val, bytes); - } - - pInfo->hasKey = true; - - doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); - doKeepTuple(pRowSup, tsList[j], gid); - } else if (compareVal(val, &pInfo->stateKey)) { - doKeepTuple(pRowSup, tsList[j], gid); - if (j == 0 && pRowSup->startRowIndex != 0) { - pRowSup->startRowIndex = 0; - } - } else { // a new state window started - SResultRow* pResult = NULL; - - // keep the time window for the closed time window. - STimeWindow window = pRowSup->win; - - pRowSup->win.ekey = pRowSup->win.skey; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); - } - - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); - - // here we start a new session window - doKeepNewWindowStartInfo(pRowSup, tsList, j, gid); - doKeepTuple(pRowSup, tsList[j], gid); - - // todo extract method - if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) { - varDataCopy(pInfo->stateKey.pData, val); - } else { - memcpy(pInfo->stateKey.pData, val, bytes); - } - } - } - - SResultRow* pResult = NULL; - pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; - int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, - pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); + int32_t ret = + setSingleOutputTupleBufv1(&pInfo->binfo.resultRowInfo, &pRowSup->win, &pInfo->pRow, pSup, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows, pBlock->info.rows, numOfOutput); } -SSDataBlock* doEventWindowAgg(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } +void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInfo, SSDataBlock* pBlock) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; - SEventWindowOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SOptrBasicInfo* pBInfo = &pInfo->binfo; + int64_t gid = pBlock->info.id.groupId; - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - setOperatorCompleted(pOperator); - return NULL; - } + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); + TSKEY* tsList = (TSKEY*)pColInfoData->pData; - blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - while (1) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + SColumnInfoData *ps = NULL, *pe = NULL; - bool hasRemain = hasRemainResults(&pInfo->groupResInfo); - if (!hasRemain) { - setOperatorCompleted(pOperator); - break; - } + SWindowRowsSup* pRowSup = &pInfo->winSup; + pRowSup->numOfRows = 0; - if (pBInfo->pRes->info.rows > 0) { - break; + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + int32_t code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶m1); + + int32_t status1 = 0; + bool keep1 = filterExecute(pInfo->pStartCondInfo, pBlock, &ps, NULL, param1.numOfCols, &status1); + + SFilterColumnParam param2 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; + code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶m2); + + int32_t status2 = 0; + bool keep2 = filterExecute(pInfo->pEndCondInfo, pBlock, &pe, NULL, param2.numOfCols, &status2); + + int32_t rowIndex = 0; + int32_t startIndex = pInfo->inWindow ? 0 : -1; + + while (rowIndex < pBlock->info.rows) { + if (pInfo->inWindow) { // let's find the first end value + for (rowIndex = startIndex; rowIndex < pBlock->info.rows; ++rowIndex) { + if (((bool*)pe->pData)[rowIndex]) { + break; + } + } + + if (rowIndex < pBlock->info.rows) { + doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo); + + // todo check if pblock buffer is enough + doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset); + + copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pInfo->binfo.pRes, + pSup->rowEntryInfoOffset, pTaskInfo); + + pInfo->binfo.pRes->info.rows += pInfo->pRow->numOfRows; + + pInfo->inWindow = false; + rowIndex += 1; + } else { + doEventWindowAggImpl(pInfo, pSup, startIndex, pBlock->info.rows - 1, pBlock, tsList, pTaskInfo); + } + } else { // find the first start value that is fulfill for the start condition + for (; rowIndex < pBlock->info.rows; ++rowIndex) { + if (((bool*)ps->pData)[rowIndex]) { + doKeepNewWindowStartInfo(pRowSup, tsList, rowIndex, gid); + pInfo->inWindow = true; + startIndex = rowIndex; + break; + } + } + + if (pInfo->inWindow) { + continue; + } else { + break; + } } } - - pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; - return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fecdcd8fa3..5ee209a07a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -936,8 +936,7 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u pAggInfo->groupId = groupId; } -static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, - const int32_t* rowEntryOffset) { +void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) { bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset); @@ -960,8 +959,8 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu } } -static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, - SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { +void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; @@ -1022,7 +1021,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos T_LONG_JMP(pTaskInfo->env, code); } - doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); + copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; @@ -1070,7 +1069,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS } pGroupResInfo->index += 1; - doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); + copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; @@ -2087,8 +2086,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); - } else { - ASSERT(0); } if (pOperator != NULL) { @@ -2179,8 +2176,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC == type) { pOptr = createTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo); - } else { - ASSERT(0); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT == type) { + pOptr = createEventwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo); } taosMemoryFree(ops);