From 0c64e67fdaa28d98c98e824baacd04032d01d484 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Jul 2023 10:19:13 +0800 Subject: [PATCH] fix(stream): fix set upstream info for history task. --- source/dnode/mnode/impl/src/mndScheduler.c | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a5b2416ea9..1f9f686f75 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,7 +25,7 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream); +static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); @@ -270,13 +270,13 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setEpToDownstreamTask(pTask, pSinkTask); + setTaskUpstreamEpInfo(pTask, pSinkTask); } return TSDB_CODE_SUCCESS; } -static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { +static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); if (pEpInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -301,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } -int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { +int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) { SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); if (pEpInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -424,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui return -1; } - return setEpToDownstreamTask(pTask, pDownstreamTask); + return setTaskUpstreamEpInfo(pTask, pDownstreamTask); } static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, @@ -592,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr return TSDB_CODE_SUCCESS; } +static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) { + SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL); + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask); + } +} + static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); @@ -643,11 +651,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* return code; } - SArray* pSinkTaskList = taosArrayGetP(pStream->tasks, 0); - for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { - SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); - setEpToDownstreamTask(pAggTask, pSinkTask); - } + setSinkTaskUpstreamInfo(pStream->tasks, pAggTask); + setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask); // source level return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);