fix(stream): fix set upstream info for history task.

This commit is contained in:
Haojun Liao 2023-07-18 10:19:13 +08:00
parent 92106312d2
commit 0c64e67fda
1 changed files with 15 additions and 10 deletions

View File

@ -25,7 +25,7 @@
#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode; extern bool tsDeployOnSnode;
static int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream); static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory); SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
@ -270,13 +270,13 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setEpToDownstreamTask(pTask, pSinkTask); setTaskUpstreamEpInfo(pTask, pSinkTask);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -301,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
} }
int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -424,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return -1; return -1;
} }
return setEpToDownstreamTask(pTask, pDownstreamTask); return setTaskUpstreamEpInfo(pTask, pDownstreamTask);
} }
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
@ -592,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) {
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask);
}
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
@ -643,11 +651,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
return code; return code;
} }
SArray* pSinkTaskList = taosArrayGetP(pStream->tasks, 0); setSinkTaskUpstreamInfo(pStream->tasks, pAggTask);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setEpToDownstreamTask(pAggTask, pSinkTask);
}
// source level // source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);