From 2aec989bd7ff711fc4463c2d5033e6c31295d97c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 17 May 2022 05:13:27 +0800 Subject: [PATCH] enh(tmq): add drop msg to vnode --- source/dnode/mnode/impl/src/mndSubscribe.c | 1 + source/dnode/vnode/src/tq/tq.c | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index f0a596b9bb..3e932e8a67 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -73,6 +73,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSubActionDelete}; mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2e83625bc0..bc9893b8a0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -863,7 +863,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - return taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey)); + + int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey)); + ASSERT(code == 0); + return 0; } // TODO: persist meta into tdb