diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b911befcc..59a78efe53 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -635,6 +635,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->freeFp = (_free_reader_fn_t)tqCloseReader; SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor); tqReaderAddTbUidList(pTask->exec.pTqReader, pList); + taosArrayDestroy(pList); } streamSetupTrigger(pTask); diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index ceddf4f215..790547bb61 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -9,7 +9,7 @@ target_include_directories( target_link_libraries( stream PUBLIC tdb - PRIVATE os util transport qcom executor + PRIVATE os util transport qcom executor wal ) if(${BUILD_TEST}) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7d2d7a666f..4783997276 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -15,6 +15,7 @@ #include "executor.h" #include "tstream.h" +#include "wal.h" SStreamTask* tNewStreamTask(int64_t streamId) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); @@ -191,6 +192,10 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->exec.pTqReader = NULL; } + if (pTask->exec.pWalReader != NULL) { + walCloseReader(pTask->exec.pWalReader); + } + taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);