diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1230a352d9..ce4efe905f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1154,6 +1154,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { + tDeleteStreamDispatchReq(&req); return -1; } } @@ -1196,6 +1197,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { tDeleteStreamRetrieveReq(&req); return 0; } else { + tDeleteStreamRetrieveReq(&req); return -1; } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 51cc315780..c3a7b70904 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -176,10 +176,12 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { // add to the ready tasks hash map, not the restored tasks hash map int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + tFreeStreamTask(pTask); return -1; } if (streamMetaSaveTask(pMeta, pTask) < 0) { + tFreeStreamTask(pTask); return -1; }