From 5cdbebbcd83c7a3b2bf68bc28033ed8b2f998068 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 2 Jan 2025 11:01:00 +0800 Subject: [PATCH] fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL --- source/client/src/clientTmq.c | 12 ++++++------ utils/test/c/tmq_td32526.c | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ad0486fdf5..670398d1f5 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2356,22 +2356,21 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){ } static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ + int32_t code = 0; SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp; if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform - int32_t code = askEp(tmq, NULL, false, true); + code = askEp(tmq, NULL, false, true); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); - return code; } } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - int32_t code = askEp(tmq, NULL, false, false); + code = askEp(tmq, NULL, false, false); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); - return code; } - } else{ - return pRspWrapper->code; + } else if (code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){ + code = 0; } tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); @@ -2383,6 +2382,7 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_SUCCESS; } static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ diff --git a/utils/test/c/tmq_td32526.c b/utils/test/c/tmq_td32526.c index 33cb586501..0150745f57 100644 --- a/utils/test/c/tmq_td32526.c +++ b/utils/test/c/tmq_td32526.c @@ -181,7 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { printResult(tmqmessage); taos_free_result(tmqmessage); } else { - ASSERT(taos_errno(NULL) == TSDB_CODE_TIMEOUT_ERROR); + ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_POLL_TIMEOUT); break; } }