Merge pull request #29823 from taosdata/feat/TS-5928

fix:[TS-5928]add consumer parameters
This commit is contained in:
Shengliang Guan 2025-02-19 19:22:26 +08:00 committed by GitHub
commit 4e064b5aec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 80 additions and 67 deletions

View File

@ -44,6 +44,8 @@ There are many parameters for creating consumers, which flexibly support various
| `enable.replay` | boolean | Whether to enable data replay function | Default is off |
| `session.timeout.ms` | integer | Timeout after consumer heartbeat is lost, after which rebalance logic is triggered, and upon success, that consumer will be removed (supported from version 3.3.3.0) | Default is 12000, range [6000, 1800000] |
| `max.poll.interval.ms` | integer | The longest time interval for consumer poll data fetching, exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and upon success, that consumer will be removed (supported from version 3.3.3.0) | Default is 300000, range [1000, INT32_MAX] |
| `fetch.max.wait.ms` | integer | The maximum time it takes for the server to return data once (supported from version 3.3.6.0) | Default is 1000, range [1, INT32_MAX] |
| `min.poll.rows` | integer | The minimum number of data returned by the server once (supported from version 3.3.6.0) | Default is 4096, range [1, INT32_MAX]
Below are the connection parameters for connectors in various languages:
<Tabs defaultValue="java" groupId="lang">

View File

@ -43,6 +43,8 @@ TDengine 消费者的概念跟 Kafka 类似,消费者通过订阅主题来接
| `enable.replay` | boolean | 是否开启数据回放功能 | 默认关闭 |
| `session.timeout.ms` | integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除从3.3.3.0版本开始支持) | 默认值为 12000取值范围 [6000 1800000] |
| `max.poll.interval.ms` | integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线触发rebalance 逻辑,成功后该 consumer 会被删除从3.3.3.0版本开始支持) | 默认值为 300000[1000INT32_MAX] |
| `fetch.max.wait.ms` | integer | 服务端单次返回数据的最大耗时从3.3.6.0版本开始支持) | 默认值为 1000[1INT32_MAX] |
| `min.poll.rows` | integer | 服务端单次返回数据的最小条数从3.3.6.0版本开始支持) | 默认值为 4096[1INT32_MAX] |
下面是各语言连接器创建参数:

View File

@ -3002,8 +3002,10 @@ enum {
TOPIC_SUB_TYPE__COLUMN,
};
#define DEFAULT_MAX_POLL_INTERVAL 300000
#define DEFAULT_SESSION_TIMEOUT 12000
#define DEFAULT_MAX_POLL_INTERVAL 300000
#define DEFAULT_SESSION_TIMEOUT 12000
#define DEFAULT_MAX_POLL_WAIT_TIME 1000
#define DEFAULT_MIN_POLL_ROWS 4096
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
@ -4207,6 +4209,7 @@ typedef struct {
int8_t enableReplay;
int8_t sourceExcluded;
int8_t rawData;
int32_t minPollRows;
int8_t enableBatchMeta;
SHashObj *uidHash; // to find if uid is duplicated
} SMqPollReq;

View File

@ -94,6 +94,8 @@ struct tmq_conf_t {
int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
int8_t rawData; // fetch raw data
int32_t maxPollWaitTime;
int32_t minPollRows;
uint16_t port;
int32_t autoCommitInterval;
int32_t sessionTimeoutMs;
@ -124,6 +126,8 @@ struct tmq_t {
int8_t replayEnable;
int8_t sourceExcluded; // do not consume, bit
int8_t rawData; // fetch raw data
int32_t maxPollWaitTime;
int32_t minPollRows;
int64_t consumerId;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
@ -307,6 +311,8 @@ tmq_conf_t* tmq_conf_new() {
conf->heartBeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL;
conf->maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL;
conf->sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT;
conf->maxPollWaitTime = DEFAULT_MAX_POLL_WAIT_TIME;
conf->minPollRows = DEFAULT_MIN_POLL_ROWS;
return conf;
}
@ -508,6 +514,28 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return TMQ_CONF_OK;
}
if (strcasecmp(key, "fetch.max.wait.ms") == 0) {
int64_t tmp = 0;
code = taosStr2int64(value, &tmp);
if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
tqErrorC("invalid value for fetch.max.wait.ms: %s", value);
return TMQ_CONF_INVALID;
}
conf->maxPollWaitTime = tmp;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "min.poll.rows") == 0) {
int64_t tmp = 0;
code = taosStr2int64(value, &tmp);
if (tmp <= 0 || tmp > INT32_MAX || code != 0) {
tqErrorC("invalid value for min.poll.rows: %s", value);
return TMQ_CONF_INVALID;
}
conf->minPollRows = tmp;
return TMQ_CONF_OK;
}
if (strcasecmp(key, "td.connect.db") == 0) {
return TMQ_CONF_OK;
}
@ -1748,6 +1776,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->rawData = conf->rawData;
pTmq->maxPollWaitTime = conf->maxPollWaitTime;
pTmq->minPollRows = conf->minPollRows;
pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) {
@ -2113,14 +2143,15 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
return code;
}
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
if (pReq == NULL || tmq == NULL || pTopic == NULL || pVg == NULL) {
return;
}
(void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
pReq->consumerId = tmq->consumerId;
pReq->timeout = timeout < 0 ? INT32_MAX : timeout;
pReq->timeout = tmq->maxPollWaitTime;
pReq->minPollRows = tmq->minPollRows;
pReq->epoch = tmq->epoch;
pReq->reqOffset = pVg->offsetInfo.endOffset;
pReq->head.vgId = pVg->vgId;
@ -2224,14 +2255,14 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
}
}
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg) {
SMqPollReq req = {0};
char* msg = NULL;
SMqPollCbParam* pParam = NULL;
SMsgSendInfo* sendInfo = NULL;
int code = 0;
int lino = 0;
tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
tmqBuildConsumeReqImpl(&req, pTmq, pTopic, pVg);
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
@ -2283,7 +2314,7 @@ END:
return code;
}
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
static int32_t tmqPollImpl(tmq_t* tmq) {
if (tmq == NULL) {
return TSDB_CODE_INVALID_MSG;
}
@ -2336,7 +2367,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
}
atomic_store_32(&pVg->vgSkipCnt, 0);
code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
code = doTmqPollImpl(tmq, pTopic, pVg);
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -2598,7 +2629,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
code = tmqHandleAllDelayedTask(tmq);
TSDB_CHECK_CODE(code, lino, END);
code = tmqPollImpl(tmq, timeout);
code = tmqPollImpl(tmq);
TSDB_CHECK_CODE(code, lino, END);
rspObj = tmqHandleAllRsp(tmq);
@ -3475,7 +3506,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pParam->pCommon = pCommon;
SMqPollReq req = {0};
tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
tmqBuildConsumeReqImpl(&req, tmq, pTopic, pClientVg);
req.reqOffset = pClientVg->offsetInfo.beginOffset;
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);

View File

@ -9234,6 +9234,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->sourceExcluded));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->enableBatchMeta));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->rawData));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->minPollRows));
tEndEncode(&encoder);
@ -9289,6 +9290,7 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->rawData));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->minPollRows));
}
tEndDecode(&decoder);

View File

@ -113,7 +113,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
void tqDestroyTqHandle(void* data);
// tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, int64_t timeout);
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, const SMqPollReq* pRequest);
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
@ -183,8 +183,6 @@ int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode*
#define TQ_SUBSCRIBE_NAME "subscribe"
#define TQ_OFFSET_NAME "offset-ver0"
#define TQ_POLL_MAX_TIME 1000
#define TQ_POLL_MAX_BYTES 1048576
#ifdef __cplusplus
}

View File

@ -218,7 +218,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
if (totalRows >= pRequest->minPollRows || (taosGetTimestampMs() - st > pRequest->timeout)) {
break;
}
}
@ -233,7 +233,7 @@ END:
return code;
}
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, int64_t timeout) {
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
int32_t code = 0;
int32_t lino = 0;
char* tbName = NULL;
@ -274,7 +274,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
pRsp->blockNum++;
rowCnt += pDataBlock->info.rows;
if (rowCnt <= tmqRowSize && (taosGetTimestampMs() - st <= TMIN(TQ_POLL_MAX_TIME, timeout))) {
if (rowCnt <= pRequest->minPollRows && (taosGetTimestampMs() - st <= pRequest->timeout)) {
continue;
}
}

View File

@ -235,7 +235,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
if (offset->type != TMQ_OFFSET__LOG) {
TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest->timeout));
TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest));
if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
@ -353,7 +353,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf));
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen));
totalMetaRows++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > pRequest->timeout)) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
@ -376,10 +376,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest));
if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) ||
(taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) ||
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES ||
taosArrayGetSize(taosxRsp.blockData) > tmqRowSize ||
if ((pRequest->rawData == 0 && totalRows >= pRequest->minPollRows) ||
(taosGetTimestampMs() - st > pRequest->timeout) ||
(pRequest->rawData != 0 && (taosArrayGetSize(taosxRsp.blockData) > pRequest->minPollRows ||
terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
// tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
// (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);

View File

@ -321,7 +321,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqOffset.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py

View File

@ -170,7 +170,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqOffset.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_primary_key.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDropConsumer.py

View File

@ -18,10 +18,19 @@ import datetime
sys.path.append("./7-tmq")
from tmqCommon import *
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"min.poll.rows": "1"
}
class TDTestCase:
clientCfgDict = {'debugFlag': 135}
updatecfgDict = {'debugFlag': 135, 'asynclog': 0, 'tmqRowSize':1}
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
updatecfgDict["clientCfg"] = clientCfgDict
def init(self, conn, logSql, replicaVar=1):
@ -44,16 +53,7 @@ class TDTestCase:
tdSql.execute(f'create topic topic_pk_query as select * from pk')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["topic_pk_query"])
except TmqError:
@ -156,14 +156,6 @@ class TDTestCase:
tdSql.execute(f'create topic topic_pk_stable as stable pks')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
}
consumer = Consumer(consumer_dict)
try:
@ -266,14 +258,6 @@ class TDTestCase:
tdSql.execute(f'create topic topic_in with meta as database abc1')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
}
consumer = Consumer(consumer_dict)
try:
@ -376,14 +360,6 @@ class TDTestCase:
tdSql.execute(f'create topic topic_pk_string with meta as database db_pk_string')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
}
consumer = Consumer(consumer_dict)
try:
@ -485,14 +461,6 @@ class TDTestCase:
tdSql.execute(f'create topic topic_pk_query_30755 as select * from pk')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
}
consumer = Consumer(consumer_dict)
try:

View File

@ -90,7 +90,6 @@ python3 ./test.py -f 7-tmq/tmqParamsTest.py -R
python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py
python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py
python3 ./test.py -f 7-tmq/tmqOffset.py
python3 ./test.py -f 7-tmq/tmq_primary_key.py
python3 ./test.py -f 7-tmq/tmqDropConsumer.py
python3 ./test.py -f 1-insert/insert_stb.py
python3 ./test.py -f 1-insert/delete_stable.py

View File

@ -226,6 +226,15 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
tmq_conf_set(conf, "fetch.max.wait.ms", "1");
assert(tmq_conf_set(conf, "fetch.max.wait.ms", "100000000000") == TMQ_CONF_INVALID);
assert(tmq_conf_set(conf, "fetch.max.wait.ms", "-100000000000") == TMQ_CONF_INVALID);
assert(tmq_conf_set(conf, "fetch.max.wait.ms", "0") == TMQ_CONF_INVALID);
assert(tmq_conf_set(conf, "fetch.max.wait.ms", "1000") == TMQ_CONF_OK);
assert(tmq_conf_set(conf, "min.poll.rows", "100000000000") == TMQ_CONF_INVALID);
assert(tmq_conf_set(conf, "min.poll.rows", "-1") == TMQ_CONF_INVALID);
assert(tmq_conf_set(conf, "min.poll.rows", "0") == TMQ_CONF_INVALID);
assert(tmq_conf_set(conf, "min.poll.rows", "1") == TMQ_CONF_OK);
// tmq_conf_set(conf, "max.poll.interval.ms", "20000");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);