fix:set triggerParams=0 in mid interval
This commit is contained in:
parent
9c2ab769ea
commit
dd0afef6f2
|
@ -421,11 +421,13 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory) {
|
static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhistory, bool useTriggerParam) {
|
||||||
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
|
||||||
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
|
||||||
|
|
||||||
SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, isFillhistory, pStream->conf.triggerParam, *pTaskList, pStream->conf.fillHistory);
|
SStreamTask* pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, isFillhistory,
|
||||||
|
useTriggerParam ? pStream->conf.triggerParam : 0,
|
||||||
|
*pTaskList, pStream->conf.fillHistory);
|
||||||
if (pAggTask == NULL) {
|
if (pAggTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -435,9 +437,10 @@ static SStreamTask* buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFil
|
||||||
return pAggTask;
|
return pAggTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory){
|
static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset,
|
||||||
|
SVgObj* pVgroup, SSnodeObj* pSnode, bool isFillhistory, bool useTriggerParam){
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory);
|
SStreamTask* pTask = buildAggTask(pStream, pEpset, isFillhistory, useTriggerParam);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -452,7 +455,7 @@ static int32_t doAddAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset){
|
static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, SEpSet* pEpset, bool useTriggerParam){
|
||||||
SVgObj* pVgroup = NULL;
|
SVgObj* pVgroup = NULL;
|
||||||
SSnodeObj* pSnode = NULL;
|
SSnodeObj* pSnode = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -465,13 +468,13 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SSubplan* plan, S
|
||||||
pVgroup = mndSchedFetchOneVg(pMnode, pStream);
|
pVgroup = mndSchedFetchOneVg(pMnode, pStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false);
|
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, false, useTriggerParam);
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->conf.fillHistory) {
|
if (pStream->conf.fillHistory) {
|
||||||
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true);
|
code = doAddAggTask(pStream, pMnode, plan, pEpset, pVgroup, pSnode, true, useTriggerParam);
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
@ -623,7 +626,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
addNewTaskList(pStream);
|
addNewTaskList(pStream);
|
||||||
|
|
||||||
for(int j = 0; j < cnt; j++){
|
for(int j = 0; j < cnt; j++){
|
||||||
code = addAggTask(pStream, pMnode, plan, pEpset);
|
code = addAggTask(pStream, pMnode, plan, pEpset, false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -645,7 +648,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
|
||||||
SArray** list = taosArrayGetLast(pStream->tasks);
|
SArray** list = taosArrayGetLast(pStream->tasks);
|
||||||
size_t size = taosArrayGetSize(*list);
|
size_t size = taosArrayGetSize(*list);
|
||||||
addNewTaskList(pStream);
|
addNewTaskList(pStream);
|
||||||
code = addAggTask(pStream, pMnode, plan, pEpset);
|
code = addAggTask(pStream, pMnode, plan, pEpset, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -413,7 +413,6 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
pCreate->sql = NULL;
|
pCreate->sql = NULL;
|
||||||
pCreate->ast = NULL;
|
pCreate->ast = NULL;
|
||||||
|
|
||||||
qDebugL("ast:%s", pObj->ast);
|
|
||||||
// deserialize ast
|
// deserialize ast
|
||||||
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
if (nodesStringToNode(pObj->ast, &pAst) < 0) {
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
|
|
|
@ -1441,6 +1441,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||||
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
|
int32_t code = 0;
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -1470,7 +1471,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
if (pIntervalPhyNode->window.pExprs != NULL) {
|
if (pIntervalPhyNode->window.pExprs != NULL) {
|
||||||
int32_t numOfScalar = 0;
|
int32_t numOfScalar = 0;
|
||||||
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
|
||||||
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -1489,7 +1490,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState);
|
||||||
|
|
||||||
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
|
||||||
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
|
||||||
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -574,53 +574,6 @@ static int32_t stbSplCreatePartMidWindowNode(SWindowLogicNode* pMergeWindow, SLo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
//static int32_t stbSplCreateSemiWindowNode(SWindowLogicNode* pMidWindow, SLogicNode** pSemiWindow) {
|
|
||||||
// SNodeList* pFunc = pMidWindow->pFuncs;
|
|
||||||
// pMidWindow->pFuncs = NULL;
|
|
||||||
// nodesDestroyList(pMidWindow->node.pTargets);
|
|
||||||
// pMidWindow->node.pTargets = NULL;
|
|
||||||
// SNodeList* pChildren = pMidWindow->node.pChildren;
|
|
||||||
// pMidWindow->node.pChildren = NULL;
|
|
||||||
//
|
|
||||||
// SWindowLogicNode* pPartWin = (SWindowLogicNode*)nodesCloneNode((SNode*)pMidWindow);
|
|
||||||
// if (NULL == pPartWin) {
|
|
||||||
// return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// pPartWin->node.pChildren = pChildren;
|
|
||||||
// splSetParent((SLogicNode*)pPartWin);
|
|
||||||
//
|
|
||||||
// int32_t index = 0;
|
|
||||||
// int32_t code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMidWindow->pFuncs);
|
|
||||||
// if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
// code = stbSplAppendWStart(pPartWin->pFuncs, &index, ((SColumnNode*)pMidWindow->pTspk)->node.resType.precision);
|
|
||||||
// }
|
|
||||||
// if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
// code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
// code = createColumnByRewriteExprs(pMidWindow->pFuncs, &pMidWindow->node.pTargets);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
// nodesDestroyNode(pMidWindow->pTspk);
|
|
||||||
// pMidWindow->pTspk = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index));
|
|
||||||
// if (NULL == pMidWindow->pTspk) {
|
|
||||||
// code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// nodesDestroyList(pFunc);
|
|
||||||
// if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
// *pSemiWindow = (SLogicNode*)pPartWin;
|
|
||||||
// } else {
|
|
||||||
// nodesDestroyNode((SNode*)pPartWin);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return code;
|
|
||||||
//}
|
|
||||||
|
|
||||||
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
|
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||||
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
|
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
|
||||||
|
|
Loading…
Reference in New Issue