fix:set vg status idle if reveive poll callback

This commit is contained in:
wangmm0220 2023-09-04 16:53:12 +08:00
parent fd85d5885a
commit 90e938e48f
2 changed files with 8 additions and 18 deletions

View File

@ -1333,7 +1333,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64,
tmq->consumerId, vgId, requestId);
goto FAILED;
goto END;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
@ -1343,7 +1343,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
vgId, tstrerror(code), requestId);
}
goto FAILED;
goto END;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
@ -1353,7 +1353,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscWarn("consumer:0x%" PRIx64
" msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
goto FAILED;
code = -1;
goto END;
}
ASSERT(msgEpoch == clientEpoch);
@ -1364,7 +1365,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
goto FAILED;
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
pRspWrapper->tmqRspType = rspType;
@ -1407,14 +1409,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, total, requestId);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pParam);
taosMemoryFreeClear(pMsg->pData);
return 0;
FAILED:
END:
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
if(pVg){
@ -1428,7 +1423,7 @@ FAILED:
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
return -1;
return code;
}
typedef struct SVgroupSaveInfo {
@ -1844,13 +1839,9 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
}
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = sver;
pVg->offsetInfo.walVerEnd = ever + 1;
// pVg->receivedInfoFromVnode = true;
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {

View File

@ -401,7 +401,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
ASSERT(0);
continue;
}
taosWLockLatch(&pSub->lock);