fix(tmq): fix dead lock.

This commit is contained in:
Haojun Liao 2023-03-14 11:50:02 +08:00
parent 21ef5e5577
commit 9cdd52de84
1 changed files with 8 additions and 7 deletions

View File

@ -538,11 +538,6 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
.handle = NULL, .handle = NULL,
}; };
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d, ordinal:%d/%d",
tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn,
pEp->port, index + 1, totalVgroups);
pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestId = generateRequestId();
pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam; pMsgSendInfo->param = pParam;
@ -553,6 +548,12 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%" PRId64 " prev:%" PRId64
", ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn,
pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
return 0; return 0;
@ -627,11 +628,11 @@ HANDLE_RSP:
} }
if (!async) { if (!async) {
taosThreadMutexUnlock(&tmq->lock);
tsem_wait(&pParamSet->rspSem); tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr; code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem); tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
taosThreadMutexUnlock(&tmq->lock);
return code; return code;
} else { } else {
code = 0; code = 0;
@ -736,7 +737,7 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) { void* userParam) {
if (msg) { // user invoked commit? if (msg) { // user invoked commit
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam); return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else { // this for auto commit } else { // this for auto commit
return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam); return tmqCommitConsumerImpl(tmq, automatic, async, userCb, userParam);