diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 37527325db..1a9f808688 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -901,6 +901,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * // status const char *pStatusName = mndConsumerStatusName(pConsumer->status); status = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + MND_TMQ_NULL_CHECK(status); STR_TO_VARSTR(status, pStatusName); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -940,6 +941,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); parasStr = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes); + MND_TMQ_NULL_CHECK(parasStr); (void)sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); varDataSetLen(parasStr, strlen(varDataVal(parasStr))); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b2a866979c..8bc3c9064e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -396,7 +396,6 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { END: taosMemoryFree(pVgEp); - sdbRelease(pMnode->pSdb, pVgroup); taosArrayDestroyP(newVgs, (FDelete)tDeleteSMqVgEp); return code; } @@ -773,7 +772,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { if (status == MQ_CONSUMER_STATUS_READY) { if (taosArrayGetSize(pConsumer->currentTopics) == 0) { // unsubscribe or close - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); + MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); } else if (hbStatus * tsMqRebalanceInterval * 1000 >= pConsumer->sessionTimeoutMs || pollStatus * tsMqRebalanceInterval * 1000 >= pConsumer->maxPollIntervalMs) { taosRLockLatch(&pConsumer->lock); @@ -788,7 +787,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj *rebSubHash) { MND_TMQ_RETURN_CHECK(buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId)); taosRUnLockLatch(&pConsumer->lock); } else { - mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info); + MND_TMQ_RETURN_CHECK(mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info)); } mndReleaseConsumer(pMnode, pConsumer);