diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h
index ff3f35bfed..add893c8c7 100644
--- a/source/libs/stream/inc/streamInt.h
+++ b/source/libs/stream/inc/streamInt.h
@@ -31,6 +31,12 @@ typedef struct {
void* timer;
} SStreamGlobalEnv;
+typedef struct {
+ SEpSet epset;
+ int32_t taskId;
+ SRpcMsg msg;
+} SStreamContinueExecInfo;
+
extern SStreamGlobalEnv streamEnv;
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index 88af841f05..ca5d5994b7 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -648,12 +648,6 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory
return 0;
}
-typedef struct {
- SEpSet epset;
- int32_t taskId;
- SRpcMsg msg;
-} SStreamContinueExecInfo;
-
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
int32_t len = 0;
int32_t code = 0;
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index d54d5fa8b8..863c4ce025 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -13,6 +13,8 @@
* along with this program. If not, see .
*/
+#include
+#include
#include "executor.h"
#include "tstream.h"
#include "wal.h"
@@ -203,6 +205,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
return 0;
}
+static void freeItem(void* p) {
+ SStreamContinueExecInfo* pInfo = p;
+ rpcFreeCont(pInfo->msg.pCont);
+}
+
void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s", pTask->id.idStr);
@@ -252,7 +259,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
}
if (pTask->pRspMsgList != NULL) {
- pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList);
+ taosArrayDestroyEx(pTask->pRspMsgList, freeItem);
+ pTask->pRspMsgList = NULL;
}
taosThreadMutexDestroy(&pTask->lock);