fix:remove lock for consume handler
This commit is contained in:
parent
1061eef144
commit
14fd2e7904
|
@ -325,25 +325,25 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. check re-balance status
|
// 2. check re-balance status
|
||||||
taosRLockLatch(&pTq->lock);
|
// taosRLockLatch(&pTq->lock);
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||||
taosRUnLockLatch(&pTq->lock);
|
// taosRUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pTq->lock);
|
// taosRUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
// 3. update the epoch value
|
// 3. update the epoch value
|
||||||
taosWLockLatch(&pTq->lock);
|
// taosWLockLatch(&pTq->lock);
|
||||||
int32_t savedEpoch = pHandle->epoch;
|
int32_t savedEpoch = pHandle->epoch;
|
||||||
if (savedEpoch < reqEpoch) {
|
if (savedEpoch < reqEpoch) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch,
|
||||||
reqEpoch);
|
reqEpoch);
|
||||||
pHandle->epoch = reqEpoch;
|
pHandle->epoch = reqEpoch;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->lock);
|
// taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
|
@ -358,12 +358,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
// taosWLockLatch(&pTq->lock);
|
||||||
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
// if (code != 0) {
|
||||||
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
|
// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
}
|
// }
|
||||||
taosWUnLockLatch(&pTq->lock);
|
// taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
|
|
|
@ -254,7 +254,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
|
||||||
// lock
|
// lock
|
||||||
taosWLockLatch(&pTq->lock);
|
// taosWLockLatch(&pTq->lock);
|
||||||
|
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
|
@ -263,12 +263,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||||
//code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
// //code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
// taosWUnLockLatch(&pTq->lock);
|
||||||
return code;
|
// return code;
|
||||||
}
|
// }
|
||||||
|
|
||||||
|
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
@ -281,7 +281,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
tFormatOffset(buf, 80, &dataRsp.rspOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
// taosWUnLockLatch(&pTq->lock);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue