diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b99f54642e..74b1dcec00 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { } tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); + taosWLockLatch(&pTq->lock); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); code = 0; + taosWUnLockLatch(&pTq->lock); goto end; } // 2. check consumer-vg assignment status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != req.consumerId) { tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, req.consumerId, vgId, req.subKey, pHandle->consumerId); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; goto end; } @@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek. tqUnregisterPushHandle(pTq, pHandle); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); end: rsp.code = code; @@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); // 1. find handle + taosRLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey); terrno = TSDB_CODE_INVALID_MSG; + taosRUnLockLatch(&pTq->lock); return -1; } // 2. check re-balance status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, vgId, req.subKey, pHandle->consumerId); @@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg bool exec = tqIsHandleExec(pHandle); if(exec){ - tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); taosWUnLockLatch(&pTq->lock); taosMsleep(10); @@ -689,19 +692,29 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } else { - taosWLockLatch(&pTq->lock); - - if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); - } else { - tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, - req.newConsumerId); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, 0); - tqUnregisterPushHandle(pTq, pHandle); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + while(1){ + taosWLockLatch(&pTq->lock); + bool exec = tqIsHandleExec(pHandle); + if(exec){ + tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId, + pHandle->subKey, pHandle); + taosWUnLockLatch(&pTq->lock); + taosMsleep(10); + continue; + } + if (pHandle->consumerId == req.newConsumerId) { // do nothing + tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); + } else { + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, + req.newConsumerId); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); + tqUnregisterPushHandle(pTq, pHandle); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + } + taosWUnLockLatch(&pTq->lock); + break; } - taosWUnLockLatch(&pTq->lock); } end: