enh(tmq): add drop msg to vnode

This commit is contained in:
Liu Jicong 2022-05-17 05:13:27 +08:00
parent 56f8443f43
commit 2aec989bd7
2 changed files with 5 additions and 1 deletions

View File

@ -73,6 +73,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndSubActionDelete}; .deleteFp = (SdbDeleteFp)mndSubActionDelete};
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);

View File

@ -863,7 +863,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
return taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
int32_t code = taosHashRemove(pTq->execs, pReq->subKey, strlen(pReq->subKey));
ASSERT(code == 0);
return 0;
} }
// TODO: persist meta into tdb // TODO: persist meta into tdb