diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 4aa4a4ddf2..c1494fd0d0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -402,6 +402,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ +#ifdef TMQ_DEBUG + ASSERT(0); +#endif continue; } taosWLockLatch(&pSub->lock); @@ -499,7 +502,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); // txn guarantees pSub is created if(pSub == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif continue; } taosRLockLatch(&pSub->lock); @@ -510,7 +515,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if(pTopic == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif taosRUnLockLatch(&pSub->lock); mndReleaseSubscribe(pMnode, pSub); continue; @@ -657,7 +664,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c756341164..ae58eeee35 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -847,7 +847,11 @@ end: mndReleaseSubscribe(pMnode, pSub); mndTransDrop(pTrans); - return code; + if (code != 0) { + mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic); + return code; + } + return TSDB_CODE_ACTION_IN_PROGRESS; } void mndCleanupSubscribe(SMnode *pMnode) {} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 58544090e2..3aeb679eb7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -670,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = NULL; while (1) { pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) { + if (pHandle) { + break; + } + taosRLockLatch(&pTq->lock); + ret = tqMetaGetHandle(pTq, req.subKey); + taosRUnLockLatch(&pTq->lock); + + if (ret < 0) { break; } } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 39627a5f7b..4d470ee5b6 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t vgId = TD_VID(pTq->pVnode); // update the table list for each consumer handle + taosWLockLatch(&pTq->lock); while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) { @@ -1116,6 +1117,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); taosArrayDestroy(list); taosHashCancelIterate(pTq->pHandle, pIter); + taosWUnLockLatch(&pTq->lock); return ret; } tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL); @@ -1125,7 +1127,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } } - + taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader taosWLockLatch(&pTq->pStreamMeta->lock); while (1) {