From a72dac975cea6b394880fc6f1ea08040de23e288 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 3 Sep 2024 11:37:14 +0800 Subject: [PATCH] feat:[TD-30270] opti close logic in tmq --- source/client/src/clientTmq.c | 3 ++- tests/system-test/7-tmq/subscribeStb2.py | 4 ++-- tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5fd0377522..3185094785 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -742,7 +742,8 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){ code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet); if (code != 0){ - goto END; + tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current:%" PRId64 ", ordinal:%d/%d", + tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); } } } diff --git a/tests/system-test/7-tmq/subscribeStb2.py b/tests/system-test/7-tmq/subscribeStb2.py index cdbc41a593..02d1630be7 100644 --- a/tests/system-test/7-tmq/subscribeStb2.py +++ b/tests/system-test/7-tmq/subscribeStb2.py @@ -266,7 +266,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != expectrowcnt: + if totalConsumeRows < expectrowcnt: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") @@ -287,7 +287,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows != expectrowcnt*2: + if totalConsumeRows < expectrowcnt*2: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py index 7b31019572..96352fbe52 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb-mutilVg.py @@ -198,7 +198,7 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]): + if expectrowcnt > resultList[0]: tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) @@ -219,7 +219,7 @@ class TDTestCase: actConsumeTotalRows = firstConsumeRows + resultList[0] - if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows): + if totalRowsInserted > actConsumeTotalRows: tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0])) tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId)