From 100c65ced5ec0ef78fa42af9d5e2d3114fa188ac Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 3 Mar 2025 19:55:04 +0800 Subject: [PATCH 1/3] fix: remove waitting for empty block --- source/client/src/clientTmq.c | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4adc738d35..5ed32b4556 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -28,7 +28,6 @@ #define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) #define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) -#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_ASKEP_INTERVAL 1000 @@ -174,7 +173,6 @@ typedef struct { int32_t vgId; int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups - int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. @@ -948,6 +946,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (code == TSDB_CODE_SUCCESS) { *pTaskType = type; if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { + tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 2", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type); } @@ -1226,7 +1225,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .epSet = pVgEp->epSet, .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, - .emptyBlockReceiveTs = 0, .blockReceiveTs = 0, .blockSleepForReplay = 0, .numOfRows = pInfo ? pInfo->numOfRows : 0, @@ -2128,7 +2126,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } } - + tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 1", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); } @@ -2344,14 +2342,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) { if (pVg == NULL) { continue; } - int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; - if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms - tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, - tmq->epoch, pVg->vgId); - continue; - } - - elapsed = taosGetTimestampMs() - pVg->blockReceiveTs; + int64_t elapsed = taosGetTimestampMs() - pVg->blockReceiveTs; if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) { tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); @@ -2446,7 +2437,6 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ SMqClientVg* pVg = NULL; getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); if (pVg) { - pVg->emptyBlockReceiveTs = taosGetTimestampMs(); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } taosWUnLockLatch(&tmq->lock); @@ -2525,7 +2515,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); - pVg->emptyBlockReceiveTs = taosGetTimestampMs(); } else { pRspObj = buildRsp(pollRspWrapper); if (pRspObj == NULL) { @@ -2539,7 +2528,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); tmq->totalRows += numOfRows; } - pVg->emptyBlockReceiveTs = 0; if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime; @@ -2643,8 +2631,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; - TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, 0); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); + TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0); } else { (void)tsem2_timewait(&tmq->rspSem, 1000); } From 08601c5dc98da9a0ab4e6af55551450d6a1e8829 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 4 Mar 2025 17:57:33 +0800 Subject: [PATCH 2/3] fix: remove waitting for empty block & optimize poll logic --- docs/en/14-reference/09-error-code.md | 4 +- docs/zh/14-reference/09-error-code.md | 4 +- source/client/src/clientTmq.c | 83 ++++----------- tests/system-test/7-tmq/tmq_c_test.py | 4 + utils/test/c/CMakeLists.txt | 8 ++ utils/test/c/tmq_poll_test.c | 140 ++++++++++++++++++++++++++ 6 files changed, 179 insertions(+), 64 deletions(-) create mode 100644 utils/test/c/tmq_poll_test.c diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 2d75dc953c..7dbd58bb1d 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -537,8 +537,10 @@ This document details the server error codes that may be encountered when using | Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users | | ---------- | --------------------- | ------------------------------------------------------------ | -------------------------------------------- | +| 0x800003E6 | Consumer not exist | Consumer timeout offline | rebuild consumer to subscribe data again | +| 0x800003EA | Consumer not ready | Consumer rebalancing | retry after 2s | | 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details | -| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users | +| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error | | 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed | | 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data | | 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs | diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index 7326f45b34..1d996beac8 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -557,8 +557,10 @@ description: TDengine 服务端的错误码列表和详细说明 | 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 | | ---------- | --------------------- | -------------------------------------------------------------------------------- | ------------------------------ | +| 0x800003E6 | Consumer not exist | Consumer 超时下线 | 重新建consumer订阅数据 | +| 0x800003EA | Consumer not ready | Consumer 正在平衡中 | 等待2秒后重试 | | 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 | -| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 | +| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误 | | 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 | | 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe,直接 poll 数据 | | 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 | diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 5ed32b4556..d160e5cf7e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -148,7 +148,6 @@ struct tmq_t { STscObj* pTscObj; // connection SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of rsp - STaosQall* qall; STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit tsem2_t rspSem; }; @@ -946,7 +945,6 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (code == TSDB_CODE_SUCCESS) { *pTaskType = type; if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { - tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 2", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type); } @@ -1133,7 +1131,7 @@ void tmqSendHbReq(void* param, void* tmrId) { tDestroySMqHbReq(&req); if (tmrId != NULL) { bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); - tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag); + tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag); } int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId); if (ret != 0){ @@ -1485,27 +1483,14 @@ END: } static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { - STaosQall* qall = NULL; - int32_t code = 0; - - code = taosAllocateQall(&qall); - if (code) { - tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code)); - return code; - } - - int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall); - if (numOfItems == 0) { - taosFreeQall(qall); - return 0; - } - - tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); - int8_t* pTaskType = NULL; - while (taosGetQitem(qall, (void**)&pTaskType) != 0) { + tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask)); + while (1) { + int8_t* pTaskType = NULL; + taosReadQitem(pTmq->delayedTask, (void**)&pTaskType); + if (pTaskType == NULL) {break;} if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId); - code = askEp(pTmq, NULL, false, false); + int32_t code = askEp(pTmq, NULL, false, false); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); } @@ -1528,23 +1513,15 @@ static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { taosFreeQitem(pTaskType); } - taosFreeQall(qall); return 0; } void tmqClearUnhandleMsg(tmq_t* tmq) { if (tmq == NULL) return; - SMqRspWrapper* rspWrapper = NULL; - while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) { - tmqFreeRspWrapper(rspWrapper); - taosFreeQitem(rspWrapper); - } - - rspWrapper = NULL; - if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){ - return; - } - while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) { + while (1) { + SMqRspWrapper* rspWrapper = NULL; + taosReadQitem(tmq->mqueue, (void**)&rspWrapper); + if (rspWrapper == NULL) break; tmqFreeRspWrapper(rspWrapper); taosFreeQitem(rspWrapper); } @@ -1611,7 +1588,6 @@ void tmqFreeImpl(void* handle) { taosCloseQueue(tmq->delayedTask); } - taosFreeQall(tmq->qall); if(tsem2_destroy(&tmq->rspSem) != 0) { tqErrorC("failed to destroy sem in free tmq"); } @@ -1737,14 +1713,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - code = taosAllocateQall(&pTmq->qall); - if (code) { - tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), - pTmq->groupId); - SET_ERROR_MSG_TMQ("allocate qall failed") - goto _failed; - } - if (conf->groupId[0] == 0) { tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); @@ -2126,7 +2094,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } } - tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 1", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); } @@ -2320,7 +2287,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) { taosWLockLatch(&tmq->lock); if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){ - code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + code = TSDB_CODE_MND_CONSUMER_NOT_EXIST; goto end; } @@ -2421,12 +2388,12 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform code = askEp(tmq, NULL, false, true); if (code != 0) { - tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " failed to ask ep wher vnode transform, code:%s", tmq->consumerId, tstrerror(code)); } } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - code = askEp(tmq, NULL, false, false); + code = syncAskEp(tmq); if (code != 0) { - tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code)); } } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){ code = 0; @@ -2563,22 +2530,14 @@ END: } static void* tmqHandleAllRsp(tmq_t* tmq) { - tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); + tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue)); int32_t code = 0; void* returnVal = NULL; while (1) { SMqRspWrapper* pRspWrapper = NULL; - if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { - code = taosReadAllQitems(tmq->mqueue, tmq->qall); - if (code == 0){ - goto END; - } - code = taosGetQitem(tmq->qall, (void**)&pRspWrapper); - if (code == 0) { - goto END; - } - } + taosReadQitem(tmq->mqueue, (void**)&pRspWrapper); + if (pRspWrapper == NULL) {break;} tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); if (pRspWrapper->code != 0) { @@ -2617,9 +2576,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { code = tmqHandleAllDelayedTask(tmq); TSDB_CHECK_CODE(code, lino, END); - code = tmqPollImpl(tmq); - TSDB_CHECK_CODE(code, lino, END); - rspObj = tmqHandleAllRsp(tmq); if (rspObj) { tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); @@ -2628,6 +2584,9 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { code = terrno; TSDB_CHECK_CODE(code, lino, END); + code = tmqPollImpl(tmq); + TSDB_CHECK_CODE(code, lino, END); + if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; diff --git a/tests/system-test/7-tmq/tmq_c_test.py b/tests/system-test/7-tmq/tmq_c_test.py index a2ed4aa708..0552a700dd 100644 --- a/tests/system-test/7-tmq/tmq_c_test.py +++ b/tests/system-test/7-tmq/tmq_c_test.py @@ -37,6 +37,10 @@ class TDTestCase: tdLog.info(cmdStr) os.system(cmdStr) + cmdStr = '%s/build/bin/tmq_poll_test'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + return def stop(self): diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 1b2716b8e5..b68476add8 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(tmq_td32187 tmq_td32187.c) add_executable(tmq_ts5776 tmq_ts5776.c) add_executable(tmq_td32471 tmq_td32471.c) add_executable(tmq_td33798 tmq_td33798.c) +add_executable(tmq_poll_test tmq_poll_test.c) add_executable(tmq_write_raw_test tmq_write_raw_test.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) @@ -89,6 +90,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_poll_test + PUBLIC ${TAOS_LIB} + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_td32526 PUBLIC ${TAOS_LIB} diff --git a/utils/test/c/tmq_poll_test.c b/utils/test/c/tmq_poll_test.c new file mode 100644 index 0000000000..4c4183995a --- /dev/null +++ b/utils/test/c/tmq_poll_test.c @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + +TAOS_RES* pRes = NULL; +TAOS* pConn = NULL; +TAOS_RES* tmqmessage = NULL; + +#define EXEC_SQL(sql) \ + pRes = taos_query(pConn,sql);\ + ASSERT(taos_errno(pRes) == 0);\ + taos_free_result(pRes) + +void init_env() { + EXEC_SQL("drop topic if exists topic_db"); + EXEC_SQL("drop database if exists db_src"); + EXEC_SQL("create database if not exists db_src vgroups 1 wal_retention_period 3600"); + EXEC_SQL("use db_src"); + EXEC_SQL("create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)"); + EXEC_SQL("insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')"); + + EXEC_SQL("create topic topic_db as database db_src"); +} + + + +tmq_t* build_consumer(bool testLongHeartBeat) { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + + if (testLongHeartBeat){ + ASSERT(tmq_conf_set(conf, "session.timeout.ms", "8000") == TMQ_CONF_OK); + ASSERT(tmq_conf_set(conf, "heartbeat.interval.ms", "100000") == TMQ_CONF_OK); + } + + tmq_conf_set_auto_commit_cb(conf, NULL, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + ASSERT(tmq != NULL); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "topic_db"); + return topic_list; +} + +void test_poll_continuity(tmq_t* tmq, tmq_list_t* topics) { + ASSERT ((tmq_subscribe(tmq, topics)) == 0); + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage != NULL); + taos_free_result(tmqmessage); + + ASSERT (tmq_unsubscribe(tmq) == 0); + printf("unsubscribe success\n"); + + ASSERT (tmq_subscribe(tmq, topics) == 0); + printf("subscribe success\n"); + + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage == NULL); + taos_free_result(tmqmessage); + + EXEC_SQL("insert into ct1 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')"); + printf("insert into ct1\n"); + + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage != NULL); + taos_free_result(tmqmessage); +} + + +void test_consumer_offline(tmq_t* tmq, tmq_list_t* topics) { + ASSERT ((tmq_subscribe(tmq, topics)) == 0); + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage != NULL); + taos_free_result(tmqmessage); + + taosSsleep(15); + + tmqmessage = tmq_consumer_poll(tmq, 500); + ASSERT (tmqmessage == NULL); + ASSERT (taos_errno(NULL) == TSDB_CODE_MND_CONSUMER_NOT_EXIST); + taos_free_result(tmqmessage); +} + +int main(int argc, char* argv[]) { + pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT (pConn != NULL); + printf("test poll continuity\n"); + for (int i = 0; i < 10; i++){ + printf("-------run times:%d start---------\n", i); + init_env(); + tmq_t* tmq = build_consumer(false); + tmq_list_t* topic_list = build_topic_list(); + test_poll_continuity(tmq, topic_list); + ASSERT(tmq_consumer_close(tmq) == 0); + tmq_list_destroy(topic_list); + printf("-------run times:%d end---------\n\n", i); + } + +// printf("\n\n\ntest consumer offline\n"); +// init_env(); +// tmq_t* tmq = build_consumer(true); +// tmq_list_t* topic_list = build_topic_list(); +// test_consumer_offline(tmq, topic_list); +// ASSERT(tmq_consumer_close(tmq) == 0); +// tmq_list_destroy(topic_list); + + taos_close(pConn); + +} From 9a3d6e3fb344fd88bce540e1eed445596aca5199 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 4 Mar 2025 19:12:28 +0800 Subject: [PATCH 3/3] fix: remove waitting for empty block & optimize poll logic --- utils/test/c/tmq_td32471.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/tmq_td32471.c b/utils/test/c/tmq_td32471.c index bf14e3f61b..80f93541f9 100644 --- a/utils/test/c/tmq_td32471.c +++ b/utils/test/c/tmq_td32471.c @@ -76,7 +76,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { taosSsleep(5); TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); ASSERT(tmqmessage == NULL); - ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_CONSUMER_MISMATCH); + ASSERT(taos_errno(NULL) == TSDB_CODE_MND_CONSUMER_NOT_EXIST); code = tmq_consumer_close(tmq); if (code)