opti:tmq logic
This commit is contained in:
parent
6f4809fe21
commit
5a5bf29acb
|
@ -940,6 +940,7 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro
|
||||||
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -618,7 +618,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
//reuse this function for topic
|
//reuse this function for topic
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname,
|
auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname,
|
||||||
createTopicReq.sql, strlen(createTopicReq.sql));
|
createTopicReq.sql, strlen(createTopicReq.sql));
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -670,7 +670,6 @@ static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topi
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SMqConsumerObj *pConsumer = NULL;
|
SMqConsumerObj *pConsumer = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
sdbRelease(pSdb, pConsumer);
|
|
||||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -683,6 +682,7 @@ static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topi
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
sdbRelease(pSdb, pConsumer);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||||
|
@ -697,7 +697,7 @@ static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topi
|
||||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
sdbRelease(pSdb, pConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
|
|
Loading…
Reference in New Issue