diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index cea664f64d..1b77a0ad6a 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -362,10 +362,9 @@ static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoc pHead->walever = 0; void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - code = tEncodeSMqAskEpRsp(&abuf, rsp); - if (code != 0) { + if (tEncodeSMqAskEpRsp(&abuf, rsp) < 0) { rpcFreeCont(buf); - return code; + return TSDB_CODE_TSC_INTERNAL_ERROR; } // send rsp diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 3aa0b5a01d..e90962ca5e 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -594,7 +594,9 @@ int32_t tCloneSubscribeObj(const SMqSubscribeObj *pSub, SMqSubscribeObj **ppSub) pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL); (void)memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); pSubNew->qmsg = taosStrdup(pSub->qmsg); - + if (ppSub) { + *ppSub = pSubNew; + } END: return code; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ec915eb9d8..a08e94e24f 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -397,9 +397,11 @@ END: } static int32_t processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { - int32_t code = 0; SMqSubscribeObj *pSub = NULL; - MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub)); // put all offset rows + int32_t code = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key, &pSub); // put all offset rows + if( code != 0){ + return 0; + } taosRLockLatch(&pSub->lock); if (pOutput->pSub->offsetRows == NULL) { pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); @@ -858,12 +860,11 @@ END: } static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) { - int32_t code = 0; const char *key = rebInput->pRebInfo->key; SMqSubscribeObj *pSub = NULL; - MND_TMQ_RETURN_CHECK(mndAcquireSubscribeByKey(pMnode, key, &pSub)); + int32_t code = mndAcquireSubscribeByKey(pMnode, key, &pSub); - if (pSub == NULL) { + if (code != 0) { // split sub key and extract topic char topic[TSDB_TOPIC_FNAME_LEN] = {0}; char cgroup[TSDB_CGROUP_LEN] = {0}; @@ -893,7 +894,7 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu taosRUnLockLatch(&pSub->lock); goto END; } - code =checkConsumer(pMnode, rebOutput->pSub); + code = checkConsumer(pMnode, rebOutput->pSub); if(code != 0){ taosRUnLockLatch(&pSub->lock); goto END;