From 3628888ecc69195c5ecafe955e5810b5008636ce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 29 May 2023 18:02:45 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 26 ++++------------------ source/libs/stream/src/streamTask.c | 11 ++++++++- 3 files changed, 15 insertions(+), 24 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7008c0ac40..1d4bbf073e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -351,7 +351,7 @@ typedef struct SStreamMeta { int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory); +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b68565b146..df8c11a6f6 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -27,13 +27,6 @@ extern bool tsDeployOnSnode; static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); -static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) { - int32_t childId = taosArrayGetSize(pArray); - pTask->selfChildId = childId; - taosArrayPush(pArray, &pTask); - return 0; -} - int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { SNode* pAst = NULL; @@ -236,14 +229,12 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) { SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory); + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - mndAddToTaskset(pTaskList, pTask); - pTask->nodeId = vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); mndSetSinkTaskInfo(pStream, pTask); @@ -257,14 +248,11 @@ static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStr static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory, bool hasExtraSink) { - SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory); + SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList); if (pTask == NULL) { return terrno; } - mndAddToTaskset(pTaskList, pTask); - pTask->triggerParam = pStream->triggerParam; // trigger - // sink or dispatch if (hasExtraSink) { mndAddDispatcherForInnerTask(pMnode, pStream, pTask); @@ -376,16 +364,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory); + pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel); if (pInnerTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; qDestroyQueryPlan(pPlan); return -1; } - mndAddToTaskset(taskInnerLevel, pInnerTask); - pInnerTask->triggerParam = pStream->triggerParam; // trigger - // dispatch if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) { qDestroyQueryPlan(pPlan); @@ -445,7 +430,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory); + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); @@ -453,11 +438,8 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - mndAddToTaskset(taskSourceLevel, pTask); - // all the source tasks dispatch result to a single agg node. setFixedDownstreamEpInfo(pTask, pInnerTask); - pTask->triggerParam = 0; if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5152e43dbd..a0caffd41f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -17,7 +17,14 @@ #include "tstream.h" #include "wal.h" -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory) { +static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) { + int32_t childId = taosArrayGetSize(pArray); + pTask->selfChildId = childId; + taosArrayPush(pArray, &pTask); + return 0; +} + +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -28,6 +35,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->id.streamId = streamId; pTask->taskLevel = taskLevel; pTask->fillHistory = fillHistory; + pTask->triggerParam = triggerParam; char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); @@ -37,6 +45,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + mndAddToTaskset(pTaskList, pTask); return pTask; }