From 27eabb3da93d3a8a7db197dfb87ee1e8ae77b983 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Jul 2023 12:05:49 +0800 Subject: [PATCH] fix(tmq): fix tmq syntax error. --- source/client/src/clientTmq.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 12314f2160..c20f81bb8a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2030,6 +2030,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->emptyBlockReceiveTs = taosGetTimestampMs(); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); + taosWUnLockLatch(&tmq->lock); + continue; } else { pVg->emptyBlockReceiveTs = 0; // reset the ts } @@ -2043,20 +2045,18 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); } - tmq->totalRows += numOfRows; + tmq->totalRows += numOfRows; - char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); - tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, - tmq->totalRows, pollRspWrapper->reqId); + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); + tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, + tmq->totalRows, pollRspWrapper->reqId); - taosFreeQitem(pollRspWrapper); - taosWUnLockLatch(&tmq->lock); - return pRsp; - } + taosFreeQitem(pollRspWrapper); taosWUnLockLatch(&tmq->lock); + return pRsp; } else { tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);