fix:move from main to 3.0

This commit is contained in:
wangmm0220 2023-09-19 11:51:52 +08:00
parent ef81299494
commit edb42aa2aa
4 changed files with 24 additions and 4 deletions

View File

@ -402,6 +402,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
continue;
}
taosWLockLatch(&pSub->lock);
@ -499,7 +502,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created
if(pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
continue;
}
taosRLockLatch(&pSub->lock);
@ -510,7 +515,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if(pTopic == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
continue;
@ -657,7 +664,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) {
goto _over;
}

View File

@ -847,7 +847,11 @@ end:
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
if (code != 0) {
mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic);
return code;
}
return TSDB_CODE_ACTION_IN_PROGRESS;
}
void mndCleanupSubscribe(SMnode *pMnode) {}

View File

@ -670,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle* pHandle = NULL;
while (1) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) {
if (pHandle) {
break;
}
taosRLockLatch(&pTq->lock);
ret = tqMetaGetHandle(pTq, req.subKey);
taosRUnLockLatch(&pTq->lock);
if (ret < 0) {
break;
}
}

View File

@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t vgId = TD_VID(pTq->pVnode);
// update the table list for each consumer handle
taosWLockLatch(&pTq->lock);
while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) {
@ -1116,6 +1117,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
taosArrayDestroy(list);
taosHashCancelIterate(pTq->pHandle, pIter);
taosWUnLockLatch(&pTq->lock);
return ret;
}
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
@ -1125,7 +1127,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
}
taosWUnLockLatch(&pTq->lock);
// update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);
while (1) {