fix:wait pHandle idle if vnode receives subscribe msg
This commit is contained in:
parent
53c4f4a147
commit
4e46ce4c03
|
@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
|
||||
taosWLockLatch(&pTq->lock);
|
||||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
|
||||
code = 0;
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
goto end;
|
||||
}
|
||||
|
||||
// 2. check consumer-vg assignment status
|
||||
taosRLockLatch(&pTq->lock);
|
||||
if (pHandle->consumerId != req.consumerId) {
|
||||
tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
req.consumerId, vgId, req.subKey, pHandle->consumerId);
|
||||
taosRUnLockLatch(&pTq->lock);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
goto end;
|
||||
}
|
||||
|
@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
|
||||
// TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
|
||||
tqUnregisterPushHandle(pTq, pHandle);
|
||||
taosRUnLockLatch(&pTq->lock);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
|
||||
end:
|
||||
rsp.code = code;
|
||||
|
@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
// 1. find handle
|
||||
taosRLockLatch(&pTq->lock);
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
taosRUnLockLatch(&pTq->lock);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 2. check re-balance status
|
||||
taosRLockLatch(&pTq->lock);
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, vgId, req.subKey, pHandle->consumerId);
|
||||
|
@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
bool exec = tqIsHandleExec(pHandle);
|
||||
|
||||
if(exec){
|
||||
tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
|
||||
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
|
||||
pHandle->subKey, pHandle);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
taosMsleep(10);
|
||||
|
@ -689,19 +692,29 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
}
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||
} else {
|
||||
taosWLockLatch(&pTq->lock);
|
||||
|
||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
||||
} else {
|
||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||
req.newConsumerId);
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_store_32(&pHandle->epoch, 0);
|
||||
tqUnregisterPushHandle(pTq, pHandle);
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
while(1){
|
||||
taosWLockLatch(&pTq->lock);
|
||||
bool exec = tqIsHandleExec(pHandle);
|
||||
if(exec){
|
||||
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
|
||||
pHandle->subKey, pHandle);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
taosMsleep(10);
|
||||
continue;
|
||||
}
|
||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
|
||||
} else {
|
||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||
req.newConsumerId);
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_store_32(&pHandle->epoch, 0);
|
||||
tqUnregisterPushHandle(pTq, pHandle);
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
}
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
break;
|
||||
}
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
}
|
||||
|
||||
end:
|
||||
|
|
Loading…
Reference in New Issue