From 3ac57587b2657957d8d1d68220389fb36acb70a2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 8 Jan 2025 15:54:34 +0800 Subject: [PATCH 1/2] fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL --- include/util/taoserror.h | 1 - source/client/src/clientTmq.c | 17 ++++- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/util/src/terror.c | 1 - tests/parallel_test/cases.task | 1 + tests/system-test/7-tmq/tmq_td32471.py | 54 +++++++++++++ utils/test/c/CMakeLists.txt | 8 ++ utils/test/c/tmq_td32471.c | 93 +++++++++++++++++++++++ utils/test/c/tmq_td32526.c | 2 +- 9 files changed, 171 insertions(+), 8 deletions(-) create mode 100644 tests/system-test/7-tmq/tmq_td32471.py create mode 100644 utils/test/c/tmq_td32471.c diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a45a017688..464dffa937 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1016,7 +1016,6 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) #define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) #define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017) -#define TSDB_CODE_TMQ_POLL_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x4018) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 90aa2d8c65..17990761a4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -64,6 +64,7 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__READY, + TMQ_CONSUMER_STATUS__LOST, TMQ_CONSUMER_STATUS__CLOSED, }; @@ -1318,6 +1319,9 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){ tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); + if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){ + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__LOST); + } } goto END; } @@ -2253,8 +2257,13 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { return TSDB_CODE_INVALID_MSG; } int32_t code = 0; - taosWLockLatch(&tmq->lock); + + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){ + code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + goto end; + } + int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); @@ -2366,7 +2375,7 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); } - } else if (code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){ + } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){ code = 0; } tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, @@ -2533,7 +2542,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; - TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, TSDB_CODE_TMQ_POLL_TIMEOUT); + TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, 0); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { (void)tsem2_timewait(&tmq->rspSem, 1000); @@ -2578,7 +2587,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status); displayConsumeStatistics(tmq); - if (status != TMQ_CONSUMER_STATUS__READY) { + if (status != TMQ_CONSUMER_STATUS__READY && status != TMQ_CONSUMER_STATUS__LOST) { tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status); goto END; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6e9dc6ab17..c70f10fc44 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -263,7 +263,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer)); MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user)); atomic_store_32(&pConsumer->hbStatus, 0); - mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d %d", consumerId, req.pollFlag, pConsumer->pollStatus); + mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus); if (req.pollFlag == 1){ atomic_store_32(&pConsumer->pollStatus, 0); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 572a1b23fe..ba2d471ccf 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -859,7 +859,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first") -TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_POLL_TIMEOUT, "TMQ poll timeout") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 4c1e53f87f..103d65144b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -333,6 +333,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32526.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32471.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py diff --git a/tests/system-test/7-tmq/tmq_td32471.py b/tests/system-test/7-tmq/tmq_td32471.py new file mode 100644 index 0000000000..2672c1c3b8 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_td32471.py @@ -0,0 +1,54 @@ +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def run(self): + tdSql.execute(f'create database if not exists db_32471') + tdSql.execute(f'use db_32471') + tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)') + tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)") + + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_td32471'%(buildPath) + # tdLog.info(cmdStr) + # os.system(cmdStr) + # + # tdSql.execute("drop topic db_32471_topic") + tdSql.execute(f'alter stable meters add column item_tags nchar(500)') + tdSql.execute(f'alter stable meters add column new_col nchar(100)') + tdSql.execute("create topic db_32471_topic as select * from db_32471.meters") + + tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-06 14:38:05.000',10.30000,219,0.31000, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', '1')") + + tdLog.info(cmdStr) + if os.system(cmdStr) != 0: + tdLog.exit(cmdStr) + + return + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index cb0410e9bf..d1c049ef1e 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -6,6 +6,7 @@ add_executable(tmq_taosx_ci tmq_taosx_ci.c) add_executable(tmq_ts5466 tmq_ts5466.c) add_executable(tmq_td32526 tmq_td32526.c) add_executable(tmq_td32187 tmq_td32187.c) +add_executable(tmq_td32471 tmq_td32471.c) add_executable(tmq_write_raw_test tmq_write_raw_test.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) @@ -72,6 +73,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_td32471 + PUBLIC ${TAOS_LIB} + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_td32526 PUBLIC ${TAOS_LIB} diff --git a/utils/test/c/tmq_td32471.c b/utils/test/c/tmq_td32471.c new file mode 100644 index 0000000000..bf14e3f61b --- /dev/null +++ b/utils/test/c/tmq_td32471.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "g1"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); + tmq_conf_set(conf, "max.poll.interval.ms", "2000"); + tmq_conf_set(conf, "heartbeat.interval.ms", "100"); + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "db_32471_topic"); + return topic_list; +} + +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + if (tmqmessage) { + cnt++; + taos_free_result(tmqmessage); + } else { + ASSERT(taos_errno(NULL) == 0); + break; + } + } + + taosSsleep(5); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + ASSERT(tmqmessage == NULL); + ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_CONSUMER_MISMATCH); + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main(int argc, char* argv[]) { + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + basic_consume_loop(tmq, topic_list); + tmq_list_destroy(topic_list); +} \ No newline at end of file diff --git a/utils/test/c/tmq_td32526.c b/utils/test/c/tmq_td32526.c index 0150745f57..b6e68c5efc 100644 --- a/utils/test/c/tmq_td32526.c +++ b/utils/test/c/tmq_td32526.c @@ -181,7 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { printResult(tmqmessage); taos_free_result(tmqmessage); } else { - ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_POLL_TIMEOUT); + ASSERT(taos_errno(NULL) == 0); break; } } From 9afd44f51072c8549028f3cdb7564220c69d7278 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 8 Jan 2025 16:53:22 +0800 Subject: [PATCH 2/2] fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL --- docs/en/14-reference/09-error-code.md | 1 - docs/zh/14-reference/09-error-code.md | 1 - 2 files changed, 2 deletions(-) diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 6737fd1124..233ac78a19 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -535,6 +535,5 @@ This document details the server error codes that may be encountered when using | 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users | | 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed | | 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data | -| 0x80004018 | TMQ poll timeout | timeout is too small or there is no data to consume | Adjust the timeout parameter appropriately or check if the data has been consumed | | 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs | diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index d2b48d509b..afc44c97db 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -555,6 +555,5 @@ description: TDengine 服务端的错误码列表和详细说明 | 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 | | 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 | | 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe,直接poll数据 | -| 0x80004018 | TMQ poll timeout | 数据订阅超时,超时时间太短,或者数据消费完毕 | 可适当调大timeout 参数或者检测数据是否消费完毕 | | 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 |