From 3043c7f09f24172e8f3cab87333ee5732fcc0027 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 14 Aug 2023 15:10:44 +0800 Subject: [PATCH] refactor operator --- source/libs/executor/inc/executorInt.h | 2 - .../executor/src/streamtimewindowoperator.c | 106 +++++++++--------- 2 files changed, 51 insertions(+), 57 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index bd5d8e76ab..b225627e34 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -456,7 +456,6 @@ typedef struct SStreamIntervalOperatorInfo { SArray* pPullWins; // SPullWindowInfo int32_t pullIndex; SSDataBlock* pPullDataRes; - bool isFinal; SArray* pChildren; int32_t numOfChild; SStreamState* pState; // void @@ -507,7 +506,6 @@ typedef struct SStreamSessionAggOperatorInfo { void* pDelIterator; SArray* pChildren; // cache for children's result; final stream operator SPhysiNode* pPhyNode; // create new child - bool isFinal; bool ignoreExpiredData; bool ignoreExpiredDataSaved; SArray* pUpdated; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 3849104b27..57f9476922 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -26,7 +26,8 @@ #include "tlog.h" #include "ttime.h" -#define IS_FINAL_OP(op) ((op)->isFinal) +#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) +#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); #define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" #define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" @@ -231,7 +232,7 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa dumyInfo.cur.pageId = -1; STimeWindow win = {0}; - if (IS_FINAL_OP(pInfo)) { + if (IS_FINAL_INTERVAL_OP(pOperator)) { win.skey = startTsCols[i]; win.ekey = endTsCols[i]; } else { @@ -746,14 +747,14 @@ static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int6 pTaskInfo->streamInfo.checkPointId = ckId; } -static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, +static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap) { - SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - SExprSupp* pSup = &pOperatorInfo->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; int32_t numOfOutput = pSup->numOfExprs; int32_t step = 1; TSKEY* tsCols = NULL; @@ -767,14 +768,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p int32_t startPos = 0; TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); STimeWindow nextWin = {0}; - if (IS_FINAL_OP(pInfo)) { + if (IS_FINAL_INTERVAL_OP(pOperator)) { nextWin = getFinalTimeWindow(ts, &pInfo->interval); } else { nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { + if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { break; @@ -782,7 +783,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p continue; } - if (IS_FINAL_OP(pInfo) && pInfo->numOfChild > 0) { + if (IS_FINAL_INTERVAL_OP(pOperator) && pInfo->numOfChild > 0) { bool ignore = true; SWinKey winRes = { .ts = nextWin.skey, @@ -825,7 +826,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - if (IS_FINAL_OP(pInfo)) { + if (IS_FINAL_INTERVAL_OP(pOperator)) { forwardRows = 1; } else { forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, @@ -866,7 +867,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } } - if (IS_FINAL_OP(pInfo)) { + if (IS_FINAL_INTERVAL_OP(pOperator)) { startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); } else { startPos = @@ -910,7 +911,7 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) { static char* getStreamOpName(uint16_t opType) { switch (opType) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: - return "single interval"; + return "interval single"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: return "interval final"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: @@ -921,15 +922,17 @@ static char* getStreamOpName(uint16_t opType) { } -static SSDataBlock* getResult(SOperatorInfo* pOperator){ +static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator){ SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; uint16_t opType = pOperator->operatorType; - doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); - if (pInfo->pPullDataRes->info.rows != 0) { - // process the rest of the data - printDataBlock(pInfo->pPullDataRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); - return pInfo->pPullDataRes; + if (IS_FINAL_INTERVAL_OP(pOperator)) { + doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); + if (pInfo->pPullDataRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pPullDataRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + return pInfo->pPullDataRes; + } } doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); @@ -941,7 +944,7 @@ static SSDataBlock* getResult(SOperatorInfo* pOperator){ doBuildStreamIntervalResult(pOperator, pInfo->pState, pInfo->binfo.pRes, &pInfo->groupResInfo); if (pInfo->binfo.pRes->info.rows != 0) { - printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); return pInfo->binfo.pRes; } return NULL; @@ -955,12 +958,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; SExprSupp* pSup = &pOperator->exprSupp; - qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); + qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); if (pOperator->status == OP_EXEC_DONE) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { - SSDataBlock* resBlock = getResult(pOperator); + SSDataBlock* resBlock = buildIntervalResult(pOperator); if (resBlock != NULL) { return resBlock; } @@ -971,12 +974,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } setOperatorCompleted(pOperator); - if (!IS_FINAL_OP(pInfo)) { + if (!IS_FINAL_INTERVAL_OP(pOperator)) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); - qDebug("===stream===clear semi operator"); + qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType)); } else { if (pInfo->twAggSup.maxTs > 0 && pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { @@ -984,15 +987,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } - qDebug("===stream===interval final close"); + qDebug("stask:%s ===stream===%s close", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType)); } return NULL; } else { - if (!IS_FINAL_OP(pInfo)) { + if (!IS_FINAL_INTERVAL_OP(pOperator)) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } } @@ -1024,12 +1027,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, - IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); + IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", pInfo->numOfDatapack); pInfo->numOfDatapack = 0; break; } pInfo->numOfDatapack++; - printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo)); + printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -1037,7 +1040,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); - if (IS_FINAL_OP(pInfo)) { + if (IS_FINAL_INTERVAL_OP(pOperator)) { int32_t chId = getChildIndex(pBlock); addRetriveWindow(delWins, pInfo, chId); if (pBlock->info.type != STREAM_CLEAR) { @@ -1053,7 +1056,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_CLEAR) { pInfo->pDelRes->info.type = STREAM_CLEAR; } else { @@ -1063,17 +1066,17 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } break; - } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { + } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_INTERVAL_OP(pOperator)) { pInfo->recvGetAll = true; getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; - } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { + } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_INTERVAL_OP(pOperator)) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); if (taosArrayGetSize(pInfo->pUpdated) > 0) { break; } continue; - } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { + } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_INTERVAL_OP(pOperator)) { processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { @@ -1094,7 +1097,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins); - if (IS_FINAL_OP(pInfo)) { + if (IS_FINAL_INTERVAL_OP(pOperator)) { closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, pInfo->pUpdatedMap, pInfo->pDelWins, pOperator); } @@ -1114,7 +1117,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - return getResult(pOperator); + return buildIntervalResult(pOperator); } static int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { @@ -1253,15 +1256,13 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { - pInfo->isFinal = true; pOperator->name = "StreamFinalIntervalOperator"; } else { // semi interval operator does not catch result - pInfo->isFinal = false; pOperator->name = "StreamSemiIntervalOperator"; } - if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { + if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; } @@ -2004,12 +2005,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } @@ -2030,7 +2031,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo)); + printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -2038,7 +2039,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { // gap must be 0 doDeleteTimeWindows(pAggSup, pBlock, pWins); removeSessionResults(pInfo->pStUpdated, pWins); - if (IS_FINAL_OP(pInfo)) { + if (IS_FINAL_SESSION_OP(pOperator)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; @@ -2064,8 +2065,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo), true); - if (IS_FINAL_OP(pInfo)) { + doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_SESSION_OP(pOperator), true); + if (IS_FINAL_SESSION_OP(pOperator)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); // if chIndex + 1 - size > 0, add new child @@ -2108,13 +2109,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { - printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pInfo->pDelRes; } doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session", GET_TASKID(pTaskInfo)); + printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo)); return pBInfo->pRes; } @@ -2240,7 +2241,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->pDelIterator = NULL; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pChildren = NULL; - pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; @@ -2406,11 +2406,10 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream goto _error; } - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - - pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION); - char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator"; + pOperator->operatorType = pPhyNode->type; + char* name = getStreamOpName(pOperator->operatorType); if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); @@ -2421,7 +2420,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->operatorType = pPhyNode->type; if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); for (int32_t i = 0; i < numOfChild; i++) { @@ -2436,7 +2434,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream } } - if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { + if (!IS_FINAL_SESSION_OP(pOperator) || numOfChild == 0) { pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; } @@ -3153,7 +3151,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredDataSaved = false; - pInfo->isFinal = false; SExprSupp* pSup = &pOperator->exprSupp; initBasicInfo(&pInfo->binfo, pResBlock); @@ -3194,7 +3191,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pPullWins = NULL; // SPullWindowInfo pInfo->pullIndex = 0; pInfo->pPullDataRes = NULL; - pInfo->isFinal = false; pInfo->numOfChild = 0; pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0;