refactor(tmq): add more strict drop condition check
This commit is contained in:
parent
df4fd528c0
commit
603cc02671
|
@ -418,6 +418,7 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
|
||||||
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->version);
|
buf = taosDecodeVariantI32(buf, &pSW->version);
|
||||||
|
if (pSW->nCols > 0) {
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) {
|
if (pSW->pSchema == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -426,6 +427,9 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
pSW->pSchema = NULL;
|
||||||
|
}
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
||||||
if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
||||||
|
pSubTopicEp->schema.nCols = 0;
|
||||||
taosArrayDestroy(pSubTopicEp->vgs);
|
taosArrayDestroy(pSubTopicEp->vgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -637,6 +637,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
|
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||||
|
@ -649,6 +650,33 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sz = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
char *name = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||||
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
|
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
|
||||||
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||||
|
if (strcmp(name, pTopic->name) == 0) {
|
||||||
|
mndReleaseConsumer(pMnode, pConsumer);
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||||
|
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)",
|
||||||
|
dropReq.name, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pConsumer);
|
sdbRelease(pSdb, pConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -675,15 +703,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// TODO check if rebalancing
|
// TODO check if rebalancing
|
||||||
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
|
||||||
/*ASSERT(0);*/
|
/*ASSERT(0);*/
|
||||||
|
|
|
@ -582,10 +582,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%" PRId64
|
tqDebug("tmq poll: consumer %" PRId64
|
||||||
", version:%" PRId64 "",
|
", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
|
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||||
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue