Merge pull request #29993 from taosdata/fix/marks/tmq_poll
fix: remove waitting for empty block & optimize poll logic
This commit is contained in:
commit
b91bc76534
|
@ -537,8 +537,10 @@ This document details the server error codes that may be encountered when using
|
|||
|
||||
| Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users |
|
||||
| ---------- | --------------------- | ------------------------------------------------------------ | -------------------------------------------- |
|
||||
| 0x800003E6 | Consumer not exist | Consumer timeout offline | rebuild consumer to subscribe data again |
|
||||
| 0x800003EA | Consumer not ready | Consumer rebalancing | retry after 2s |
|
||||
| 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details |
|
||||
| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users |
|
||||
| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error |
|
||||
| 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed |
|
||||
| 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data |
|
||||
| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs |
|
||||
|
|
|
@ -557,8 +557,10 @@ description: TDengine 服务端的错误码列表和详细说明
|
|||
|
||||
| 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 |
|
||||
| ---------- | --------------------- | -------------------------------------------------------------------------------- | ------------------------------ |
|
||||
| 0x800003E6 | Consumer not exist | Consumer 超时下线 | 重新建consumer订阅数据 |
|
||||
| 0x800003EA | Consumer not ready | Consumer 正在平衡中 | 等待2秒后重试 |
|
||||
| 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 |
|
||||
| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 |
|
||||
| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误 |
|
||||
| 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 |
|
||||
| 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe,直接 poll 数据 |
|
||||
| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 |
|
||||
|
|
|
@ -28,7 +28,6 @@
|
|||
#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ INFO ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ DEBUG ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
|
||||
|
||||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
|
||||
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
|
||||
#define DEFAULT_HEARTBEAT_INTERVAL 3000
|
||||
#define DEFAULT_ASKEP_INTERVAL 1000
|
||||
|
@ -149,7 +148,6 @@ struct tmq_t {
|
|||
STscObj* pTscObj; // connection
|
||||
SArray* clientTopics; // SArray<SMqClientTopic>
|
||||
STaosQueue* mqueue; // queue of rsp
|
||||
STaosQall* qall;
|
||||
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
|
||||
tsem2_t rspSem;
|
||||
};
|
||||
|
@ -174,7 +172,6 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int32_t vgStatus;
|
||||
int32_t vgSkipCnt; // here used to mark the slow vgroups
|
||||
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||
int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||
int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
|
||||
|
@ -1134,7 +1131,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
tDestroySMqHbReq(&req);
|
||||
if (tmrId != NULL) {
|
||||
bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||
tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
|
||||
tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
|
||||
}
|
||||
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
if (ret != 0){
|
||||
|
@ -1226,7 +1223,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
.epSet = pVgEp->epSet,
|
||||
.vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
|
||||
.vgSkipCnt = 0,
|
||||
.emptyBlockReceiveTs = 0,
|
||||
.blockReceiveTs = 0,
|
||||
.blockSleepForReplay = 0,
|
||||
.numOfRows = pInfo ? pInfo->numOfRows : 0,
|
||||
|
@ -1487,27 +1483,14 @@ END:
|
|||
}
|
||||
|
||||
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||
STaosQall* qall = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
code = taosAllocateQall(&qall);
|
||||
if (code) {
|
||||
tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
|
||||
if (numOfItems == 0) {
|
||||
taosFreeQall(qall);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
|
||||
int8_t* pTaskType = NULL;
|
||||
while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
|
||||
while (1) {
|
||||
int8_t* pTaskType = NULL;
|
||||
taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
|
||||
if (pTaskType == NULL) {break;}
|
||||
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
|
||||
code = askEp(pTmq, NULL, false, false);
|
||||
int32_t code = askEp(pTmq, NULL, false, false);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
|
||||
}
|
||||
|
@ -1530,23 +1513,15 @@ static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
taosFreeQitem(pTaskType);
|
||||
}
|
||||
|
||||
taosFreeQall(qall);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||
if (tmq == NULL) return;
|
||||
SMqRspWrapper* rspWrapper = NULL;
|
||||
while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(rspWrapper);
|
||||
}
|
||||
|
||||
rspWrapper = NULL;
|
||||
if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
|
||||
return;
|
||||
}
|
||||
while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
|
||||
while (1) {
|
||||
SMqRspWrapper* rspWrapper = NULL;
|
||||
taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
|
||||
if (rspWrapper == NULL) break;
|
||||
tmqFreeRspWrapper(rspWrapper);
|
||||
taosFreeQitem(rspWrapper);
|
||||
}
|
||||
|
@ -1613,7 +1588,6 @@ void tmqFreeImpl(void* handle) {
|
|||
taosCloseQueue(tmq->delayedTask);
|
||||
}
|
||||
|
||||
taosFreeQall(tmq->qall);
|
||||
if(tsem2_destroy(&tmq->rspSem) != 0) {
|
||||
tqErrorC("failed to destroy sem in free tmq");
|
||||
}
|
||||
|
@ -1739,14 +1713,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
goto _failed;
|
||||
}
|
||||
|
||||
code = taosAllocateQall(&pTmq->qall);
|
||||
if (code) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
|
||||
pTmq->groupId);
|
||||
SET_ERROR_MSG_TMQ("allocate qall failed")
|
||||
goto _failed;
|
||||
}
|
||||
|
||||
if (conf->groupId[0] == 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
|
||||
pTmq->groupId);
|
||||
|
@ -2128,7 +2094,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if (tsem2_post(&tmq->rspSem) != 0){
|
||||
tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
|
||||
}
|
||||
|
@ -2322,7 +2287,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) {
|
|||
taosWLockLatch(&tmq->lock);
|
||||
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){
|
||||
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
@ -2344,14 +2309,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) {
|
|||
if (pVg == NULL) {
|
||||
continue;
|
||||
}
|
||||
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
|
||||
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms
|
||||
tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
||||
tmq->epoch, pVg->vgId);
|
||||
continue;
|
||||
}
|
||||
|
||||
elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
|
||||
int64_t elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
|
||||
if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
||||
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
||||
|
@ -2430,12 +2388,12 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
|
||||
code = askEp(tmq, NULL, false, true);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep wher vnode transform, code:%s", tmq->consumerId, tstrerror(code));
|
||||
}
|
||||
} else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
||||
code = askEp(tmq, NULL, false, false);
|
||||
code = syncAskEp(tmq);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
|
||||
}
|
||||
} else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
|
||||
code = 0;
|
||||
|
@ -2446,7 +2404,6 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
SMqClientVg* pVg = NULL;
|
||||
getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
|
||||
if (pVg) {
|
||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
|
@ -2525,7 +2482,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
||||
", total:%" PRId64 ", QID:0x%" PRIx64,
|
||||
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||
} else {
|
||||
pRspObj = buildRsp(pollRspWrapper);
|
||||
if (pRspObj == NULL) {
|
||||
|
@ -2539,7 +2495,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
|
||||
tmq->totalRows += numOfRows;
|
||||
}
|
||||
pVg->emptyBlockReceiveTs = 0;
|
||||
if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
|
||||
pVg->blockReceiveTs = taosGetTimestampMs();
|
||||
pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
|
||||
|
@ -2575,22 +2530,14 @@ END:
|
|||
}
|
||||
|
||||
static void* tmqHandleAllRsp(tmq_t* tmq) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
|
||||
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
|
||||
|
||||
int32_t code = 0;
|
||||
void* returnVal = NULL;
|
||||
while (1) {
|
||||
SMqRspWrapper* pRspWrapper = NULL;
|
||||
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
|
||||
code = taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
if (code == 0){
|
||||
goto END;
|
||||
}
|
||||
code = taosGetQitem(tmq->qall, (void**)&pRspWrapper);
|
||||
if (code == 0) {
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
|
||||
if (pRspWrapper == NULL) {break;}
|
||||
|
||||
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
|
||||
if (pRspWrapper->code != 0) {
|
||||
|
@ -2629,9 +2576,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
code = tmqHandleAllDelayedTask(tmq);
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
code = tmqPollImpl(tmq);
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
rspObj = tmqHandleAllRsp(tmq);
|
||||
if (rspObj) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
|
||||
|
@ -2640,11 +2584,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
code = tmqPollImpl(tmq);
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
if (timeout >= 0) {
|
||||
int64_t currentTime = taosGetTimestampMs();
|
||||
int64_t elapsedTime = currentTime - startTime;
|
||||
TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, 0);
|
||||
(void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
|
||||
TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
|
||||
} else {
|
||||
(void)tsem2_timewait(&tmq->rspSem, 1000);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,10 @@ class TDTestCase:
|
|||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
cmdStr = '%s/build/bin/tmq_poll_test'%(buildPath)
|
||||
tdLog.info(cmdStr)
|
||||
os.system(cmdStr)
|
||||
|
||||
return
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -9,6 +9,7 @@ add_executable(tmq_td32187 tmq_td32187.c)
|
|||
add_executable(tmq_ts5776 tmq_ts5776.c)
|
||||
add_executable(tmq_td32471 tmq_td32471.c)
|
||||
add_executable(tmq_td33798 tmq_td33798.c)
|
||||
add_executable(tmq_poll_test tmq_poll_test.c)
|
||||
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
|
||||
add_executable(write_raw_block_test write_raw_block_test.c)
|
||||
add_executable(sml_test sml_test.c)
|
||||
|
@ -89,6 +90,13 @@ target_link_libraries(
|
|||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
target_link_libraries(
|
||||
tmq_poll_test
|
||||
PUBLIC ${TAOS_LIB}
|
||||
PUBLIC util
|
||||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
target_link_libraries(
|
||||
tmq_td32526
|
||||
PUBLIC ${TAOS_LIB}
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "cJSON.h"
|
||||
#include "taos.h"
|
||||
#include "tmsg.h"
|
||||
#include "types.h"
|
||||
|
||||
TAOS_RES* pRes = NULL;
|
||||
TAOS* pConn = NULL;
|
||||
TAOS_RES* tmqmessage = NULL;
|
||||
|
||||
#define EXEC_SQL(sql) \
|
||||
pRes = taos_query(pConn,sql);\
|
||||
ASSERT(taos_errno(pRes) == 0);\
|
||||
taos_free_result(pRes)
|
||||
|
||||
void init_env() {
|
||||
EXEC_SQL("drop topic if exists topic_db");
|
||||
EXEC_SQL("drop database if exists db_src");
|
||||
EXEC_SQL("create database if not exists db_src vgroups 1 wal_retention_period 3600");
|
||||
EXEC_SQL("use db_src");
|
||||
EXEC_SQL("create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)");
|
||||
EXEC_SQL("insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')");
|
||||
|
||||
EXEC_SQL("create topic topic_db as database db_src");
|
||||
}
|
||||
|
||||
|
||||
|
||||
tmq_t* build_consumer(bool testLongHeartBeat) {
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "group.id", "tg2");
|
||||
tmq_conf_set(conf, "client.id", "my app 1");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
|
||||
if (testLongHeartBeat){
|
||||
ASSERT(tmq_conf_set(conf, "session.timeout.ms", "8000") == TMQ_CONF_OK);
|
||||
ASSERT(tmq_conf_set(conf, "heartbeat.interval.ms", "100000") == TMQ_CONF_OK);
|
||||
}
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, NULL, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
ASSERT(tmq != NULL);
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "topic_db");
|
||||
return topic_list;
|
||||
}
|
||||
|
||||
void test_poll_continuity(tmq_t* tmq, tmq_list_t* topics) {
|
||||
ASSERT ((tmq_subscribe(tmq, topics)) == 0);
|
||||
tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
ASSERT (tmqmessage != NULL);
|
||||
taos_free_result(tmqmessage);
|
||||
|
||||
ASSERT (tmq_unsubscribe(tmq) == 0);
|
||||
printf("unsubscribe success\n");
|
||||
|
||||
ASSERT (tmq_subscribe(tmq, topics) == 0);
|
||||
printf("subscribe success\n");
|
||||
|
||||
tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
ASSERT (tmqmessage == NULL);
|
||||
taos_free_result(tmqmessage);
|
||||
|
||||
EXEC_SQL("insert into ct1 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')");
|
||||
printf("insert into ct1\n");
|
||||
|
||||
tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
ASSERT (tmqmessage != NULL);
|
||||
taos_free_result(tmqmessage);
|
||||
}
|
||||
|
||||
|
||||
void test_consumer_offline(tmq_t* tmq, tmq_list_t* topics) {
|
||||
ASSERT ((tmq_subscribe(tmq, topics)) == 0);
|
||||
tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
ASSERT (tmqmessage != NULL);
|
||||
taos_free_result(tmqmessage);
|
||||
|
||||
taosSsleep(15);
|
||||
|
||||
tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
ASSERT (tmqmessage == NULL);
|
||||
ASSERT (taos_errno(NULL) == TSDB_CODE_MND_CONSUMER_NOT_EXIST);
|
||||
taos_free_result(tmqmessage);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT (pConn != NULL);
|
||||
printf("test poll continuity\n");
|
||||
for (int i = 0; i < 10; i++){
|
||||
printf("-------run times:%d start---------\n", i);
|
||||
init_env();
|
||||
tmq_t* tmq = build_consumer(false);
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
test_poll_continuity(tmq, topic_list);
|
||||
ASSERT(tmq_consumer_close(tmq) == 0);
|
||||
tmq_list_destroy(topic_list);
|
||||
printf("-------run times:%d end---------\n\n", i);
|
||||
}
|
||||
|
||||
// printf("\n\n\ntest consumer offline\n");
|
||||
// init_env();
|
||||
// tmq_t* tmq = build_consumer(true);
|
||||
// tmq_list_t* topic_list = build_topic_list();
|
||||
// test_consumer_offline(tmq, topic_list);
|
||||
// ASSERT(tmq_consumer_close(tmq) == 0);
|
||||
// tmq_list_destroy(topic_list);
|
||||
|
||||
taos_close(pConn);
|
||||
|
||||
}
|
|
@ -76,7 +76,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
taosSsleep(5);
|
||||
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
|
||||
ASSERT(tmqmessage == NULL);
|
||||
ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_CONSUMER_MISMATCH);
|
||||
ASSERT(taos_errno(NULL) == TSDB_CODE_MND_CONSUMER_NOT_EXIST);
|
||||
|
||||
code = tmq_consumer_close(tmq);
|
||||
if (code)
|
||||
|
|
Loading…
Reference in New Issue