diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 80f77bc9a5..2cf5330f09 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -421,11 +421,13 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream return TSDB_CODE_SUCCESS; } -static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory) { +static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) { uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, isFillhistory, pStream->conf.triggerParam, *pTaskList, pStream->conf.fillHistory); + SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, isFillhistory, + useTriggerParam ? pStream->conf.triggerParam : 0, + *pTaskList, pStream->conf.fillHistory); if (pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -435,9 +437,10 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil return pAggTask; } -static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory){ +static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, + SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam){ int32_t code = 0; - SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory); + SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam); if (pTask == NULL) { return terrno; } @@ -452,7 +455,7 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, return code; } -static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset){ +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam){ SVgObj* pVgroup = NULL; SSnodeObj* pSnode = NULL; int32_t code = 0; @@ -465,13 +468,13 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S pVgroup = mndSchedFetchOneVg(pMnode, pStream); } - code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false); + code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam); if(code != 0){ goto END; } if (pStream->conf.fillHistory) { - code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true); + code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam); if(code != 0){ goto END; } @@ -623,7 +626,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* addNewTaskList(pStream); for(int j = 0; j < cnt; j++){ - code = addAggTask(pStream, pMnode, plan, pEpset); + code = addAggTask(pStream, pMnode, plan, pEpset, false); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -645,7 +648,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* SArray** list = taosArrayGetLast(pStream->tasks); size_t size = taosArrayGetSize(*list); addNewTaskList(pStream); - code = addAggTask(pStream, pMnode, plan, pEpset); + code = addAggTask(pStream, pMnode, plan, pEpset, true); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 50450f0cf2..1084536cc1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -413,7 +413,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pCreate->sql = NULL; pCreate->ast = NULL; - qDebugL("ast:%s", pObj->ast); // deserialize ast if (nodesStringToNode(pObj->ast, &pAst) < 0) { goto FAIL; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index fde3fec7d4..0766b92df3 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1441,6 +1441,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + int32_t code = 0; if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -1470,7 +1471,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1489,7 +1490,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); - int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index cae500d7b9..dc0e7da6a1 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -574,53 +574,6 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo return code; } -//static int32_t stbSplCreateSemiWindowNode(SWindowLogicNode* pMidWindow, SLogicNode** pSemiWindow) { -// SNodeList* pFunc = pMidWindow->pFuncs; -// pMidWindow->pFuncs = NULL; -// nodesDestroyList(pMidWindow->node.pTargets); -// pMidWindow->node.pTargets = NULL; -// SNodeList* pChildren = pMidWindow->node.pChildren; -// pMidWindow->node.pChildren = NULL; -// -// SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMidWindow); -// if (NULL == pPartWin) { -// return TSDB_CODE_OUT_OF_MEMORY; -// } -// -// pPartWin->node.pChildren = pChildren; -// splSetParent((SLogicNode*)pPartWin); -// -// int32_t index = 0; -// int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMidWindow->pFuncs); -// if (TSDB_CODE_SUCCESS == code) { -// code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMidWindow->pTspk)->node.resType.precision); -// } -// if (TSDB_CODE_SUCCESS == code) { -// code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets); -// } -// -// if (TSDB_CODE_SUCCESS == code) { -// code = createColumnByRewriteExprs(pMidWindow->pFuncs, &pMidWindow->node.pTargets); -// } -// -// if (TSDB_CODE_SUCCESS == code) { -// nodesDestroyNode(pMidWindow->pTspk); -// pMidWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index)); -// if (NULL == pMidWindow->pTspk) { -// code = TSDB_CODE_OUT_OF_MEMORY; -// } -// } -// -// nodesDestroyList(pFunc); -// if (TSDB_CODE_SUCCESS == code) { -// *pSemiWindow = (SLogicNode*)pPartWin; -// } else { -// nodesDestroyNode((SNode*)pPartWin); -// } -// -// return code; -//} - static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;