diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3eb33bf4a9..ce6bbf999d 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -455,7 +455,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup, pConsumer->cgroup); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - return -1; + goto FAIL; } atomic_store_32(&pConsumer->hbStatus, 0); @@ -480,7 +480,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - return -1; + goto FAIL; } int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); @@ -562,7 +562,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { void *buf = rpcMallocCont(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + goto FAIL; } SMqRspHead* pHead = buf; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 01241c9339..c9afbaf524 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -699,6 +699,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndReleaseConsumer(pMnode, pConsumer); continue; }