fix:[TD-29869]use latest vg info when drop topic

This commit is contained in:
wangmm0220 2024-04-30 15:38:09 +08:00
parent 09dcbdd352
commit 0758c4ece0
1 changed files with 17 additions and 4 deletions

View File

@ -907,21 +907,34 @@ END:
return code; return code;
} }
static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
// iter all vnode to delete handle // iter all vnode to delete handle
int32_t sz = taosArrayGetSize(pSub->unassignedVgs); int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
if(pReq == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pVgEp->vgId); pReq->head.vgId = htonl(pVgEp->vgId);
pReq->vgId = pVgEp->vgId; pReq->vgId = pVgEp->vgId;
pReq->consumerId = -1; pReq->consumerId = -1;
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId);
if (pVgObj == NULL) {
taosMemoryFree(pReq);
terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
return -1;
}
STransAction action = {0}; STransAction action = {0};
action.epSet = pVgEp->epSet; action.epSet = mndGetVgroupEpset(pMnode, pVgObj);;
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SMqVDeleteReq); action.contLen = sizeof(SMqVDeleteReq);
action.msgType = TDMT_VND_TMQ_DELETE_SUB; action.msgType = TDMT_VND_TMQ_DELETE_SUB;
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
@ -1002,7 +1015,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
goto end; goto end;
} }
code = sendDeleteSubToVnode(pSub, pTrans); code = sendDeleteSubToVnode(pMnode, pSub, pTrans);
if (code != 0) { if (code != 0) {
goto end; goto end;
} }
@ -1263,7 +1276,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
goto END; goto END;
} }
code = sendDeleteSubToVnode(pSub, pTrans); code = sendDeleteSubToVnode(pMnode, pSub, pTrans);
if (code != 0) { if (code != 0) {
goto END; goto END;
} }