fix
This commit is contained in:
parent
d75ab9b3ff
commit
f7a63661ac
|
@ -376,7 +376,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
taosRLockLatch(&pConsumerOld->lock);
|
/*taosRLockLatch(&pConsumerOld->lock);*/
|
||||||
int32_t status = atomic_load_32(&pConsumerOld->status);
|
int32_t status = atomic_load_32(&pConsumerOld->status);
|
||||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
|
@ -440,7 +440,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
||||||
|
|
||||||
SUBSCRIBE_OVER:
|
SUBSCRIBE_OVER:
|
||||||
if (pConsumerOld) {
|
if (pConsumerOld) {
|
||||||
taosRUnLockLatch(&pConsumerOld->lock);
|
/*taosRUnLockLatch(&pConsumerOld->lock);*/
|
||||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
||||||
}
|
}
|
||||||
if (pConsumerNew) {
|
if (pConsumerNew) {
|
||||||
|
@ -628,9 +628,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
|
|
||||||
// remove from removed topic
|
// remove from removed topic
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) {
|
||||||
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
|
char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
|
||||||
if (strcmp(removedTopic, topic) == 0) {
|
if (strcmp(removedTopic, topic) == 0) {
|
||||||
taosArrayRemove(pOldConsumer->rebNewTopics, i);
|
taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
|
||||||
taosMemoryFree(topic);
|
taosMemoryFree(topic);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -437,7 +437,7 @@ void* threadFunc(void* param) {
|
||||||
pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
|
pInfo->consumeMsgCnt = parallel_consume(tmq, 1);
|
||||||
//}
|
//}
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
err = tmq_unsubscribe(tmq);
|
err = tmq_unsubscribe(tmq);
|
||||||
if (err) {
|
if (err) {
|
||||||
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||||
|
@ -487,7 +487,7 @@ int main(int32_t argc, char* argv[]) {
|
||||||
totalMsgs = parallel_consume(tmq, 0);
|
totalMsgs = parallel_consume(tmq, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 1
|
||||||
err = tmq_unsubscribe(tmq);
|
err = tmq_unsubscribe(tmq);
|
||||||
if (err) {
|
if (err) {
|
||||||
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||||
|
|
Loading…
Reference in New Issue