From e776cde461a11151549c1e9f746f676d843daef0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Mar 2024 09:02:21 +0800 Subject: [PATCH] fix(stream):fix unrelease stream obj. --- source/dnode/mnode/impl/src/mndStream.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9a79f6ff5b..6067af199e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2188,10 +2188,8 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int64_t checkpointId = mndStreamGenChkpId(pMnode); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); - if (pStream != NULL) { - // TODO:handle error + if (pStream != NULL) { // TODO:handle error int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); - mndReleaseStream(pMnode, pStream); } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans // SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); @@ -2205,6 +2203,10 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams); } + if (pStream != NULL) { + mndReleaseStream(pMnode, pStream); + } + taosThreadMutexUnlock(&execInfo.lock); {