From 08601c5dc98da9a0ab4e6af55551450d6a1e8829 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 4 Mar 2025 17:57:33 +0800 Subject: [PATCH] fix: remove waitting for empty block & optimize poll logic --- docs/en/14-reference/09-error-code.md | 4 +- docs/zh/14-reference/09-error-code.md | 4 +- source/client/src/clientTmq.c | 83 ++++----------- tests/system-test/7-tmq/tmq_c_test.py | 4 + utils/test/c/CMakeLists.txt | 8 ++ utils/test/c/tmq_poll_test.c | 140 ++++++++++++++++++++++++++ 6 files changed, 179 insertions(+), 64 deletions(-) create mode 100644 utils/test/c/tmq_poll_test.c diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 2d75dc953c..7dbd58bb1d 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -537,8 +537,10 @@ This document details the server error codes that may be encountered when using | Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users | | ---------- | --------------------- | ------------------------------------------------------------ | -------------------------------------------- | +| 0x800003E6 | Consumer not exist | Consumer timeout offline | rebuild consumer to subscribe data again | +| 0x800003EA | Consumer not ready | Consumer rebalancing | retry after 2s | | 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details | -| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users | +| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error | | 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed | | 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data | | 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs | diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index 7326f45b34..1d996beac8 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -557,8 +557,10 @@ description: TDengine 服务端的错误码列表和详细说明 | 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 | | ---------- | --------------------- | -------------------------------------------------------------------------------- | ------------------------------ | +| 0x800003E6 | Consumer not exist | Consumer 超时下线 | 重新建consumer订阅数据 | +| 0x800003EA | Consumer not ready | Consumer 正在平衡中 | 等待2秒后重试 | | 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 | -| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 | +| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误 | | 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 | | 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe,直接 poll 数据 | | 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 | diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5ed32b4556..d160e5cf7e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -148,7 +148,6 @@ struct tmq_t { STscObj* pTscObj; // connection SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of rsp - STaosQall* qall; STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit tsem2_t rspSem; }; @@ -946,7 +945,6 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (code == TSDB_CODE_SUCCESS) { *pTaskType = type; if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { - tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 2", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type); } @@ -1133,7 +1131,7 @@ void tmqSendHbReq(void* param, void* tmrId) { tDestroySMqHbReq(&req); if (tmrId != NULL) { bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); - tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag); + tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag); } int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId); if (ret != 0){ @@ -1485,27 +1483,14 @@ END: } static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { - STaosQall* qall = NULL; - int32_t code = 0; - - code = taosAllocateQall(&qall); - if (code) { - tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code)); - return code; - } - - int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall); - if (numOfItems == 0) { - taosFreeQall(qall); - return 0; - } - - tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); - int8_t* pTaskType = NULL; - while (taosGetQitem(qall, (void**)&pTaskType) != 0) { + tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask)); + while (1) { + int8_t* pTaskType = NULL; + taosReadQitem(pTmq->delayedTask, (void**)&pTaskType); + if (pTaskType == NULL) {break;} if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId); - code = askEp(pTmq, NULL, false, false); + int32_t code = askEp(pTmq, NULL, false, false); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); } @@ -1528,23 +1513,15 @@ static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { taosFreeQitem(pTaskType); } - taosFreeQall(qall); return 0; } void tmqClearUnhandleMsg(tmq_t* tmq) { if (tmq == NULL) return; - SMqRspWrapper* rspWrapper = NULL; - while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) { - tmqFreeRspWrapper(rspWrapper); - taosFreeQitem(rspWrapper); - } - - rspWrapper = NULL; - if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){ - return; - } - while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) { + while (1) { + SMqRspWrapper* rspWrapper = NULL; + taosReadQitem(tmq->mqueue, (void**)&rspWrapper); + if (rspWrapper == NULL) break; tmqFreeRspWrapper(rspWrapper); taosFreeQitem(rspWrapper); } @@ -1611,7 +1588,6 @@ void tmqFreeImpl(void* handle) { taosCloseQueue(tmq->delayedTask); } - taosFreeQall(tmq->qall); if(tsem2_destroy(&tmq->rspSem) != 0) { tqErrorC("failed to destroy sem in free tmq"); } @@ -1737,14 +1713,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - code = taosAllocateQall(&pTmq->qall); - if (code) { - tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), - pTmq->groupId); - SET_ERROR_MSG_TMQ("allocate qall failed") - goto _failed; - } - if (conf->groupId[0] == 0) { tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); @@ -2126,7 +2094,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } } - tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 1", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); } @@ -2320,7 +2287,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) { taosWLockLatch(&tmq->lock); if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){ - code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; goto end; } @@ -2421,12 +2388,12 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform code = askEp(tmq, NULL, false, true); if (code != 0) { - tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " failed to ask ep wher vnode transform, code:%s", tmq->consumerId, tstrerror(code)); } } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - code = askEp(tmq, NULL, false, false); + code = syncAskEp(tmq); if (code != 0) { - tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code)); } } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){ code = 0; @@ -2563,22 +2530,14 @@ END: } static void* tmqHandleAllRsp(tmq_t* tmq) { - tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); + tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue)); int32_t code = 0; void* returnVal = NULL; while (1) { SMqRspWrapper* pRspWrapper = NULL; - if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { - code = taosReadAllQitems(tmq->mqueue, tmq->qall); - if (code == 0){ - goto END; - } - code = taosGetQitem(tmq->qall, (void**)&pRspWrapper); - if (code == 0) { - goto END; - } - } + taosReadQitem(tmq->mqueue, (void**)&pRspWrapper); + if (pRspWrapper == NULL) {break;} tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); if (pRspWrapper->code != 0) { @@ -2617,9 +2576,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { code = tmqHandleAllDelayedTask(tmq); TSDB_CHECK_CODE(code, lino, END); - code = tmqPollImpl(tmq); - TSDB_CHECK_CODE(code, lino, END); - rspObj = tmqHandleAllRsp(tmq); if (rspObj) { tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); @@ -2628,6 +2584,9 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { code = terrno; TSDB_CHECK_CODE(code, lino, END); + code = tmqPollImpl(tmq); + TSDB_CHECK_CODE(code, lino, END); + if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; diff --git a/tests/system-test/7-tmq/tmq_c_test.py b/tests/system-test/7-tmq/tmq_c_test.py index a2ed4aa708..0552a700dd 100644 --- a/tests/system-test/7-tmq/tmq_c_test.py +++ b/tests/system-test/7-tmq/tmq_c_test.py @@ -37,6 +37,10 @@ class TDTestCase: tdLog.info(cmdStr) os.system(cmdStr) + cmdStr = '%s/build/bin/tmq_poll_test'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + return def stop(self): diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 1b2716b8e5..b68476add8 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(tmq_td32187 tmq_td32187.c) add_executable(tmq_ts5776 tmq_ts5776.c) add_executable(tmq_td32471 tmq_td32471.c) add_executable(tmq_td33798 tmq_td33798.c) +add_executable(tmq_poll_test tmq_poll_test.c) add_executable(tmq_write_raw_test tmq_write_raw_test.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) @@ -89,6 +90,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_poll_test + PUBLIC ${TAOS_LIB} + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_td32526 PUBLIC ${TAOS_LIB} diff --git a/utils/test/c/tmq_poll_test.c b/utils/test/c/tmq_poll_test.c new file mode 100644 index 0000000000..4c4183995a --- /dev/null +++ b/utils/test/c/tmq_poll_test.c @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + +TAOS_RES* pRes = NULL; +TAOS* pConn = NULL; +TAOS_RES* tmqmessage = NULL; + +#define EXEC_SQL(sql) \ + pRes = taos_query(pConn,sql);\ + ASSERT(taos_errno(pRes) == 0);\ + taos_free_result(pRes) + +void init_env() { + EXEC_SQL("drop topic if exists topic_db"); + EXEC_SQL("drop database if exists db_src"); + EXEC_SQL("create database if not exists db_src vgroups 1 wal_retention_period 3600"); + EXEC_SQL("use db_src"); + EXEC_SQL("create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)"); + EXEC_SQL("insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')"); + + EXEC_SQL("create topic topic_db as database db_src"); +} + + + +tmq_t* build_consumer(bool testLongHeartBeat) { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + + if (testLongHeartBeat){ + ASSERT(tmq_conf_set(conf, "session.timeout.ms", "8000") == TMQ_CONF_OK); + ASSERT(tmq_conf_set(conf, "heartbeat.interval.ms", "100000") == TMQ_CONF_OK); + } + + tmq_conf_set_auto_commit_cb(conf, NULL, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + ASSERT(tmq != NULL); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "topic_db"); + return topic_list; +} + +void test_poll_continuity(tmq_t* tmq, tmq_list_t* topics) { + ASSERT ((tmq_subscribe(tmq, topics)) == 0); + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage != NULL); + taos_free_result(tmqmessage); + + ASSERT (tmq_unsubscribe(tmq) == 0); + printf("unsubscribe success\n"); + + ASSERT (tmq_subscribe(tmq, topics) == 0); + printf("subscribe success\n"); + + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage == NULL); + taos_free_result(tmqmessage); + + EXEC_SQL("insert into ct1 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')"); + printf("insert into ct1\n"); + + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage != NULL); + taos_free_result(tmqmessage); +} + + +void test_consumer_offline(tmq_t* tmq, tmq_list_t* topics) { + ASSERT ((tmq_subscribe(tmq, topics)) == 0); + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage != NULL); + taos_free_result(tmqmessage); + + taosSsleep(15); + + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage == NULL); + ASSERT (taos_errno(NULL) == TSDB_CODE_MND_CONSUMER_NOT_EXIST); + taos_free_result(tmqmessage); +} + +int main(int argc, char* argv[]) { + pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT (pConn != NULL); + printf("test poll continuity\n"); + for (int i = 0; i < 10; i++){ + printf("-------run times:%d start---------\n", i); + init_env(); + tmq_t* tmq = build_consumer(false); + tmq_list_t* topic_list = build_topic_list(); + test_poll_continuity(tmq, topic_list); + ASSERT(tmq_consumer_close(tmq) == 0); + tmq_list_destroy(topic_list); + printf("-------run times:%d end---------\n\n", i); + } + +// printf("\n\n\ntest consumer offline\n"); +// init_env(); +// tmq_t* tmq = build_consumer(true); +// tmq_list_t* topic_list = build_topic_list(); +// test_consumer_offline(tmq, topic_list); +// ASSERT(tmq_consumer_close(tmq) == 0); +// tmq_list_destroy(topic_list); + + taos_close(pConn); + +}