fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-04-13 23:49:14 +08:00
parent 70b45a4cf0
commit 4d83118ff7
3 changed files with 7 additions and 1 deletions

View File

@ -635,6 +635,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->freeFp = (_free_reader_fn_t)tqCloseReader;
SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor);
tqReaderAddTbUidList(pTask->exec.pTqReader, pList);
taosArrayDestroy(pList);
}
streamSetupTrigger(pTask);

View File

@ -9,7 +9,7 @@ target_include_directories(
target_link_libraries(
stream
PUBLIC tdb
PRIVATE os util transport qcom executor
PRIVATE os util transport qcom executor wal
)
if(${BUILD_TEST})

View File

@ -15,6 +15,7 @@
#include "executor.h"
#include "tstream.h"
#include "wal.h"
SStreamTask* tNewStreamTask(int64_t streamId) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
@ -191,6 +192,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->exec.pTqReader = NULL;
}
if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader);
}
taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree);
if (pTask->outputType == TASK_OUTPUT__TABLE) {
tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper);