diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 247293dbb6..2ab0dc828e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -538,7 +538,7 @@ typedef struct SStreamMeta { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc); void streamMetaClose(SStreamMeta* streamMeta); -int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); +// int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 1c8ec0a302..e88d53df24 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -239,6 +239,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(tasks, pTask); pTask->nodeId = pVgroup->vgId; @@ -270,6 +271,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(tasks, pTask); ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId); @@ -356,6 +358,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + pInnerTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d489cc044b..84abab01e3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -127,6 +127,7 @@ FAIL: return -1; } +#if 0 int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { @@ -156,6 +157,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } +#endif SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));