diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 2ce7f0ae5f..7f5df5d356 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -376,7 +376,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; } else { - taosRLockLatch(&pConsumerOld->lock); + /*taosRLockLatch(&pConsumerOld->lock);*/ int32_t status = atomic_load_32(&pConsumerOld->status); if (status != MQ_CONSUMER_STATUS__READY) { terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; @@ -440,7 +440,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { SUBSCRIBE_OVER: if (pConsumerOld) { - taosRUnLockLatch(&pConsumerOld->lock); + /*taosRUnLockLatch(&pConsumerOld->lock);*/ mndReleaseConsumer(pMnode, pConsumerOld); } if (pConsumerNew) { @@ -628,9 +628,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // remove from removed topic for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) { - char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i); + char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i); if (strcmp(removedTopic, topic) == 0) { - taosArrayRemove(pOldConsumer->rebNewTopics, i); + taosArrayRemove(pOldConsumer->rebRemovedTopics, i); taosMemoryFree(topic); break; } diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 9b627e187c..1b8c0d527d 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -437,7 +437,7 @@ void* threadFunc(void* param) { pInfo->consumeMsgCnt = parallel_consume(tmq, 1); //} -#if 0 +#if 1 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); @@ -487,7 +487,7 @@ int main(int32_t argc, char* argv[]) { totalMsgs = parallel_consume(tmq, 0); } -#if 0 +#if 1 err = tmq_unsubscribe(tmq); if (err) { printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));