From 90e938e48f8d4be240f3dc7b7f59c2b168bed20f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 4 Sep 2023 16:53:12 +0800 Subject: [PATCH] fix:set vg status idle if reveive poll callback --- source/client/src/clientTmq.c | 25 ++++++++--------------- source/dnode/mnode/impl/src/mndConsumer.c | 1 - 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8eca73da3d..87511865e9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1333,7 +1333,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (pRspWrapper == NULL) { tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64, tmq->consumerId, vgId, requestId); - goto FAILED; + goto END; } pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; @@ -1343,7 +1343,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { vgId, tstrerror(code), requestId); } - goto FAILED; + goto END; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; @@ -1353,7 +1353,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); - goto FAILED; + code = -1; + goto END; } ASSERT(msgEpoch == clientEpoch); @@ -1364,7 +1365,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - goto FAILED; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } pRspWrapper->tmqRspType = rspType; @@ -1407,14 +1409,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); - tsem_post(&tmq->rspSem); - taosReleaseRef(tmqMgmt.rsetId, refId); - taosMemoryFree(pParam); - taosMemoryFreeClear(pMsg->pData); - - return 0; - -FAILED: +END: taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); if(pVg){ @@ -1428,7 +1423,7 @@ FAILED: taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pEpSet); - return -1; + return code; } typedef struct SVgroupSaveInfo { @@ -1844,13 +1839,9 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } - // update the status - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // update the valid wal version range pVg->offsetInfo.walVerBegin = sver; pVg->offsetInfo.walVerEnd = ever + 1; -// pVg->receivedInfoFromVnode = true; } static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f25bd2cffb..7f96255b1e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -401,7 +401,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ - ASSERT(0); continue; } taosWLockLatch(&pSub->lock);