fix:set trigger parms = 0 in source task if levelPlan != 1

This commit is contained in:
wangmm0220 2023-12-18 10:57:36 +08:00
parent 80ee5c1f13
commit 64f98744e2
3 changed files with 47 additions and 15 deletions

View File

@ -260,13 +260,13 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, static SStreamTask* buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, int64_t firstWindowSkey,
int64_t firstWindowSkey, bool isFillhistory) { bool isFillhistory, bool useTriggerParam) {
uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid;
SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks);
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE,
isFillhistory, pStream->conf.triggerParam, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0,
*pTaskList, pStream->conf.fillHistory); *pTaskList, pStream->conf.fillHistory);
if (pTask == NULL) { if (pTask == NULL) {
return NULL; return NULL;
@ -311,11 +311,10 @@ static void setHTasksId(SStreamObj* pStream) {
} }
} }
static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset,
SEpSet* pEpset, int64_t nextWindowSkey, int64_t nextWindowSkey, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam ){
SVgObj* pVgroup, bool isFillhistory ){
// new stream task // new stream task
SStreamTask* pTask = buildSourceTask(pStream, pEpset, nextWindowSkey, isFillhistory); SStreamTask* pTask = buildSourceTask(pStream, pEpset, nextWindowSkey, isFillhistory, useTriggerParam);
if(pTask == NULL){ if(pTask == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
@ -362,7 +361,7 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index){
} }
static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream,
SEpSet* pEpset, int64_t nextWindowSkey) { SEpSet* pEpset, int64_t nextWindowSkey, bool useTriggerParam) {
addNewTaskList(pStream); addNewTaskList(pStream);
void* pIter = NULL; void* pIter = NULL;
@ -379,14 +378,14 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
continue; continue;
} }
int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, false); int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, false, useTriggerParam);
if(code != 0){ if(code != 0){
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
return code; return code;
} }
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, true); code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVgroup, true, useTriggerParam);
if(code != 0){ if(code != 0){
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
return code; return code;
@ -580,7 +579,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
if (plan == NULL) { if (plan == NULL) {
return terrno; return terrno;
} }
int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey); int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, numOfPlanLevel == 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }

File diff suppressed because one or more lines are too long

View File

@ -785,7 +785,35 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
// return code; // return code;
//} //}
static bool isStreamMultiAgg(SLogicNode* pNode) {
if(LIST_LENGTH(pNode->pChildren) <= 0) return false;
SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
qDebug("vgroups:%d", ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups);
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > tsStreamAggCnt;
}
return false;
}
static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL;
int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI;
((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL;
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitIntervalForStreamMultiAgg(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartWindow = NULL; SLogicNode* pPartWindow = NULL;
SLogicNode* pMidWindow = NULL; SLogicNode* pMidWindow = NULL;
int32_t code = stbSplCreatePartMidWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow, &pMidWindow); int32_t code = stbSplCreatePartMidWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow, &pMidWindow);
@ -817,7 +845,11 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf
static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->streamQuery) {
return stbSplSplitIntervalForStream(pCxt, pInfo); // if(isStreamMultiAgg(pInfo->pSplitNode)){
return stbSplSplitIntervalForStreamMultiAgg(pCxt, pInfo);
// }else{
// return stbSplSplitIntervalForStream(pCxt, pInfo);
// }
} else { } else {
return stbSplSplitIntervalForBatch(pCxt, pInfo); return stbSplSplitIntervalForBatch(pCxt, pInfo);
} }