From f7a63661ac5f592093b4ab3b45aa0e7a14db8a80 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 20 Apr 2022 22:08:04 +0800 Subject: [PATCH] fix --- source/dnode/mnode/impl/src/mndConsumer.c | 8 ++++---- tests/test/c/tmqSim.c | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2ce7f0ae5f..7f5df5d356 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -376,7 +376,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; } else { - taosRLockLatch(&pConsumerOld->lock); + /*taosRLockLatch(&pConsumerOld->lock);*/ int32_t status = atomic_load_32(&pConsumerOld->status); if (status != MQ_CONSUMER_STATUS__READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; @@ -440,7 +440,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { SUBSCRIBE_OVER: if (pConsumerOld) { - taosRUnLockLatch(&pConsumerOld->lock); + /*taosRUnLockLatch(&pConsumerOld->lock);*/ mndReleaseConsumer(pMnode, pConsumerOld); } if (pConsumerNew) { @@ -628,9 +628,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // remove from removed topic for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) { - char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i); + char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i); if (strcmp(removedTopic, topic) == 0) { - taosArrayRemove(pOldConsumer->rebNewTopics, i); + taosArrayRemove(pOldConsumer->rebRemovedTopics, i); taosMemoryFree(topic); break; } diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 9b627e187c..1b8c0d527d 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -437,7 +437,7 @@ void* threadFunc(void* param) { pInfo->consumeMsgCnt = parallel_consume(tmq, 1); //} -#if 0 +#if 1 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); @@ -487,7 +487,7 @@ int main(int32_t argc, char* argv[]) { totalMsgs = parallel_consume(tmq, 0); } -#if 0 +#if 1 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));