diff --git a/docs/en/07-develop/07-tmq.md b/docs/en/07-develop/07-tmq.md index 6b92ace6a2..dda66e1f6d 100644 --- a/docs/en/07-develop/07-tmq.md +++ b/docs/en/07-develop/07-tmq.md @@ -44,6 +44,8 @@ There are many parameters for creating consumers, which flexibly support various | `enable.replay` | boolean | Whether to enable data replay function | Default is off | | `session.timeout.ms` | integer | Timeout after consumer heartbeat is lost, after which rebalance logic is triggered, and upon success, that consumer will be removed (supported from version 3.3.3.0) | Default is 12000, range [6000, 1800000] | | `max.poll.interval.ms` | integer | The longest time interval for consumer poll data fetching, exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and upon success, that consumer will be removed (supported from version 3.3.3.0) | Default is 300000, range [1000, INT32_MAX] | +| `fetch.max.wait.ms` | integer | The maximum time it takes for the server to return data once (supported from version 3.3.6.0) | Default is 1000, range [1, INT32_MAX] | +| `min.poll.rows` | integer | The minimum number of data returned by the server once (supported from version 3.3.6.0) | Default is 4096, range [1, INT32_MAX] Below are the connection parameters for connectors in various languages: diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index c38a43f3fb..8d7bb0ac30 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -43,6 +43,8 @@ TDengine 消费者的概念跟 Kafka 类似,消费者通过订阅主题来接 | `enable.replay` | boolean | 是否开启数据回放功能 | 默认关闭 | | `session.timeout.ms` | integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除(从3.3.3.0版本开始支持) | 默认值为 12000,取值范围 [6000, 1800000] | | `max.poll.interval.ms` | integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线,触发rebalance 逻辑,成功后该 consumer 会被删除(从3.3.3.0版本开始支持) | 默认值为 300000,[1000,INT32_MAX] | +| `fetch.max.wait.ms` | integer | 服务端单次返回数据的最大耗时(从3.3.6.0版本开始支持) | 默认值为 1000,[1,INT32_MAX] | +| `min.poll.rows` | integer | 服务端单次返回数据的最小条数(从3.3.6.0版本开始支持) | 默认值为 4096,[1,INT32_MAX] | 下面是各语言连接器创建参数: diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 85e05d614c..6d58748a3b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3002,8 +3002,10 @@ enum { TOPIC_SUB_TYPE__COLUMN, }; -#define DEFAULT_MAX_POLL_INTERVAL 300000 -#define DEFAULT_SESSION_TIMEOUT 12000 +#define DEFAULT_MAX_POLL_INTERVAL 300000 +#define DEFAULT_SESSION_TIMEOUT 12000 +#define DEFAULT_MAX_POLL_WAIT_TIME 1000 +#define DEFAULT_MIN_POLL_ROWS 4096 typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic @@ -4207,6 +4209,7 @@ typedef struct { int8_t enableReplay; int8_t sourceExcluded; int8_t rawData; + int32_t minPollRows; int8_t enableBatchMeta; SHashObj *uidHash; // to find if uid is duplicated } SMqPollReq; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 743b94b78e..4adc738d35 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -94,6 +94,8 @@ struct tmq_conf_t { int8_t replayEnable; int8_t sourceExcluded; // do not consume, bit int8_t rawData; // fetch raw data + int32_t maxPollWaitTime; + int32_t minPollRows; uint16_t port; int32_t autoCommitInterval; int32_t sessionTimeoutMs; @@ -124,6 +126,8 @@ struct tmq_t { int8_t replayEnable; int8_t sourceExcluded; // do not consume, bit int8_t rawData; // fetch raw data + int32_t maxPollWaitTime; + int32_t minPollRows; int64_t consumerId; tmq_commit_cb* commitCb; void* commitCbUserParam; @@ -307,6 +311,8 @@ tmq_conf_t* tmq_conf_new() { conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL; conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL; conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT; + conf->maxPollWaitTime = DEFAULT_MAX_POLL_WAIT_TIME; + conf->minPollRows = DEFAULT_MIN_POLL_ROWS; return conf; } @@ -508,6 +514,28 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } + if (strcasecmp(key, "fetch.max.wait.ms") == 0) { + int64_t tmp = 0; + code = taosStr2int64(value, &tmp); + if (tmp <= 0 || tmp > INT32_MAX || code != 0) { + tqErrorC("invalid value for fetch.max.wait.ms: %s", value); + return TMQ_CONF_INVALID; + } + conf->maxPollWaitTime = tmp; + return TMQ_CONF_OK; + } + + if (strcasecmp(key, "min.poll.rows") == 0) { + int64_t tmp = 0; + code = taosStr2int64(value, &tmp); + if (tmp <= 0 || tmp > INT32_MAX || code != 0) { + tqErrorC("invalid value for min.poll.rows: %s", value); + return TMQ_CONF_INVALID; + } + conf->minPollRows = tmp; + return TMQ_CONF_OK; + } + if (strcasecmp(key, "td.connect.db") == 0) { return TMQ_CONF_OK; } @@ -1748,6 +1776,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; pTmq->rawData = conf->rawData; + pTmq->maxPollWaitTime = conf->maxPollWaitTime; + pTmq->minPollRows = conf->minPollRows; pTmq->enableBatchMeta = conf->enableBatchMeta; tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (taosGetFqdn(pTmq->fqdn) != 0) { @@ -2113,14 +2143,15 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { return code; } -void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { +void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) { if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) { return; } (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName); pReq->withTbName = tmq->withTbName; pReq->consumerId = tmq->consumerId; - pReq->timeout = timeout < 0 ? INT32_MAX : timeout; + pReq->timeout = tmq->maxPollWaitTime; + pReq->minPollRows = tmq->minPollRows; pReq->epoch = tmq->epoch; pReq->reqOffset = pVg->offsetInfo.endOffset; pReq->head.vgId = pVg->vgId; @@ -2224,14 +2255,14 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg } } -static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { +static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) { SMqPollReq req = {0}; char* msg = NULL; SMqPollCbParam* pParam = NULL; SMsgSendInfo* sendInfo = NULL; int code = 0; int lino = 0; - tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg); + tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG); @@ -2283,7 +2314,7 @@ END: return code; } -static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { +static int32_t tmqPollImpl(tmq_t* tmq) { if (tmq == NULL) { return TSDB_CODE_INVALID_MSG; } @@ -2336,7 +2367,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } atomic_store_32(&pVg->vgSkipCnt, 0); - code = doTmqPollImpl(tmq, pTopic, pVg, timeout); + code = doTmqPollImpl(tmq, pTopic, pVg); if (code != TSDB_CODE_SUCCESS) { goto end; } @@ -2598,7 +2629,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { code = tmqHandleAllDelayedTask(tmq); TSDB_CHECK_CODE(code, lino, END); - code = tmqPollImpl(tmq, timeout); + code = tmqPollImpl(tmq); TSDB_CHECK_CODE(code, lino, END); rspObj = tmqHandleAllRsp(tmq); @@ -3475,7 +3506,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pParam->pCommon = pCommon; SMqPollReq req = {0}; - tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg); + tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg); req.reqOffset = pClientVg->offsetInfo.beginOffset; int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index a2e08d7c6b..6a3e1948c8 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -9234,6 +9234,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->sourceExcluded)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableBatchMeta)); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->rawData)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->minPollRows)); tEndEncode(&encoder); @@ -9289,6 +9290,7 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->rawData)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->minPollRows)); } tEndDecode(&decoder); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 50d6eb0090..cd66e82687 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -113,7 +113,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); // tqRead -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, int64_t timeout); +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, const SMqPollReq* pRequest); int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); @@ -183,8 +183,6 @@ int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* #define TQ_SUBSCRIBE_NAME "subscribe" #define TQ_OFFSET_NAME "offset-ver0" -#define TQ_POLL_MAX_TIME 1000 -#define TQ_POLL_MAX_BYTES 1048576 #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index b109be5626..70a165906e 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -218,7 +218,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pRsp->blockNum++; totalRows += pDataBlock->info.rows; - if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { + if (totalRows >= pRequest->minPollRows || (taosGetTimestampMs() - st > pRequest->timeout)) { break; } } @@ -233,7 +233,7 @@ END: return code; } -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, int64_t timeout) { +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { int32_t code = 0; int32_t lino = 0; char* tbName = NULL; @@ -274,7 +274,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat pRsp->blockNum++; rowCnt += pDataBlock->info.rows; - if (rowCnt <= tmqRowSize && (taosGetTimestampMs() - st <= TMIN(TQ_POLL_MAX_TIME, timeout))) { + if (rowCnt <= pRequest->minPollRows && (taosGetTimestampMs() - st <= pRequest->timeout)) { continue; } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 44d9ffda83..9866528446 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -235,7 +235,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset)); if (offset->type != TMQ_OFFSET__LOG) { - TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest->timeout)); + TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest)); if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) { code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); @@ -353,7 +353,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf)); TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen)); totalMetaRows++; - if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { + if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) { tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); goto END; @@ -376,10 +376,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest)); - if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) || - (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) || - (pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES || - taosArrayGetSize(taosxRsp.blockData) > tmqRowSize || + if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) || + (taosGetTimestampMs() - st > pRequest->timeout) || + (pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows || terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) { // tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d", // (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 26cc122021..0d14722aaf 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -321,7 +321,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqOffset.py -,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py +#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py diff --git a/tests/parallel_test/cases_tdengine.task b/tests/parallel_test/cases_tdengine.task index 4ecfb7d919..e52fe68957 100644 --- a/tests/parallel_test/cases_tdengine.task +++ b/tests/parallel_test/cases_tdengine.task @@ -170,7 +170,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqOffset.py -,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py +#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index 13d6bd565d..58f7bfb17d 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -18,10 +18,19 @@ import datetime sys.path.append("./7-tmq") from tmqCommon import * +consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + "min.poll.rows": "1" +} class TDTestCase: clientCfgDict = {'debugFlag': 135} - updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'tmqRowSize':1} + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} updatecfgDict["clientCfg"] = clientCfgDict def init(self, conn, logSql, replicaVar=1): @@ -44,16 +53,7 @@ class TDTestCase: tdSql.execute(f'create topic topic_pk_query as select * from pk') - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - "enable.auto.commit": "false", - "experimental.snapshot.enable": "true", - } consumer = Consumer(consumer_dict) - try: consumer.subscribe(["topic_pk_query"]) except TmqError: @@ -156,14 +156,6 @@ class TDTestCase: tdSql.execute(f'create topic topic_pk_stable as stable pks') - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - "enable.auto.commit": "false", - "experimental.snapshot.enable": "true", - } consumer = Consumer(consumer_dict) try: @@ -266,14 +258,6 @@ class TDTestCase: tdSql.execute(f'create topic topic_in with meta as database abc1') - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - "enable.auto.commit": "false", - "experimental.snapshot.enable": "true", - } consumer = Consumer(consumer_dict) try: @@ -376,14 +360,6 @@ class TDTestCase: tdSql.execute(f'create topic topic_pk_string with meta as database db_pk_string') - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - "enable.auto.commit": "false", - "experimental.snapshot.enable": "true", - } consumer = Consumer(consumer_dict) try: @@ -485,14 +461,6 @@ class TDTestCase: tdSql.execute(f'create topic topic_pk_query_30755 as select * from pk') - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - "enable.auto.commit": "false", - "experimental.snapshot.enable": "true", - } consumer = Consumer(consumer_dict) try: diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 408d9e71c5..077113edaa 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -90,7 +90,6 @@ python3 ./test.py -f 7-tmq/tmqParamsTest.py -R python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py python3 ./test.py -f 7-tmq/tmqOffset.py -python3 ./test.py -f 7-tmq/tmq_primary_key.py python3 ./test.py -f 7-tmq/tmqDropConsumer.py python3 ./test.py -f 1-insert/insert_stb.py python3 ./test.py -f 1-insert/delete_stable.py diff --git a/utils/test/c/tmq_write_raw_test.c b/utils/test/c/tmq_write_raw_test.c index f33fac9a0a..6a0b7deaee 100644 --- a/utils/test/c/tmq_write_raw_test.c +++ b/utils/test/c/tmq_write_raw_test.c @@ -226,6 +226,15 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "msg.consume.excluded", "1"); + tmq_conf_set(conf, "fetch.max.wait.ms", "1"); + assert(tmq_conf_set(conf, "fetch.max.wait.ms", "100000000000") == TMQ_CONF_INVALID); + assert(tmq_conf_set(conf, "fetch.max.wait.ms", "-100000000000") == TMQ_CONF_INVALID); + assert(tmq_conf_set(conf, "fetch.max.wait.ms", "0") == TMQ_CONF_INVALID); + assert(tmq_conf_set(conf, "fetch.max.wait.ms", "1000") == TMQ_CONF_OK); + assert(tmq_conf_set(conf, "min.poll.rows", "100000000000") == TMQ_CONF_INVALID); + assert(tmq_conf_set(conf, "min.poll.rows", "-1") == TMQ_CONF_INVALID); + assert(tmq_conf_set(conf, "min.poll.rows", "0") == TMQ_CONF_INVALID); + assert(tmq_conf_set(conf, "min.poll.rows", "1") == TMQ_CONF_OK); // tmq_conf_set(conf, "max.poll.interval.ms", "20000"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);