diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8bbaadd203..0068b582cf 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -907,21 +907,34 @@ END: return code; } -static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ +static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ // iter all vnode to delete handle int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); + if(pReq == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } pReq->head.vgId = htonl(pVgEp->vgId); pReq->vgId = pVgEp->vgId; pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); + + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (pVgObj == NULL) { + taosMemoryFree(pReq); + terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; + return -1; + } STransAction action = {0}; - action.epSet = pVgEp->epSet; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj);; action.pCont = pReq; action.contLen = sizeof(SMqVDeleteReq); action.msgType = TDMT_VND_TMQ_DELETE_SUB; + + mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1002,7 +1015,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } - code = sendDeleteSubToVnode(pSub, pTrans); + code = sendDeleteSubToVnode(pMnode, pSub, pTrans); if (code != 0) { goto end; } @@ -1263,7 +1276,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) goto END; } - code = sendDeleteSubToVnode(pSub, pTrans); + code = sendDeleteSubToVnode(pMnode, pSub, pTrans); if (code != 0) { goto END; }