refactor(tmq): add more strict drop condition check
This commit is contained in:
parent
df4fd528c0
commit
2386e0b513
|
@ -637,6 +637,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
|
||||
|
||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||
|
@ -649,6 +650,33 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||
if (strcmp(name, pTopic->name) == 0) {
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
||||
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
|
@ -675,15 +703,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
|
||||
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||
|
||||
#if 0
|
||||
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||
ASSERT(0);
|
||||
mndTransDrop(pTrans);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
// TODO check if rebalancing
|
||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||
/*ASSERT(0);*/
|
||||
|
|
|
@ -582,10 +582,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
code = -1;
|
||||
}
|
||||
|
||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%" PRId64
|
||||
", version:%" PRId64 "",
|
||||
tqDebug("tmq poll: consumer %" PRId64
|
||||
", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
|
||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue