Merge pull request #22767 from taosdata/mark/tmq

fix:send delete subscribe info to vnode if drop consumer
This commit is contained in:
wade zhang 2023-09-10 22:17:26 +08:00 committed by GitHub
commit ddc8c93a9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 33 additions and 19 deletions

View File

@ -771,6 +771,29 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
return 0; return 0;
} }
static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){
// iter all vnode to delete handle
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
pReq->head.vgId = htonl(pVgEp->vgId);
pReq->vgId = pVgEp->vgId;
pReq->consumerId = -1;
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
STransAction action = {0};
action.epSet = pVgEp->epSet;
action.pCont = pReq;
action.contLen = sizeof(SMqVDeleteReq);
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
return 0;
}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0}; SMDropCgroupReq dropReq = {0};
@ -831,6 +854,11 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
code = sendDeleteSubToVnode(pSub, pTrans);
if (code != 0) {
goto end;
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
code = -1; code = -1;
@ -1113,25 +1141,11 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) { if (sendDeleteSubToVnode(pSub, pTrans) != 0) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); sdbRelease(pSdb, pSub);
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); sdbCancelFetch(pSdb, pIter);
pReq->head.vgId = htonl(pVgEp->vgId); return -1;
pReq->vgId = pVgEp->vgId;
pReq->consumerId = -1;
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
STransAction action = {0};
action.epSet = pVgEp->epSet;
action.pCont = pReq;
action.contLen = sizeof(SMqVDeleteReq);
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter);
return -1;
}
} }
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {