diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx
index 8a35f93e3f..d88e04fb63 100644
--- a/docs/en/07-develop/07-tmq.mdx
+++ b/docs/en/07-develop/07-tmq.mdx
@@ -349,7 +349,7 @@ You configure the following parameters when creating a consumer:
| `td.connect.port` | string | Port of the server side | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. |
| `client.id` | string | Client ID | Maximum length: 192. |
-| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior; `latest`: subscribe from the latest data; or `none`: can't subscribe without committed offset|
+| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior(version <= 3.1.1.0); `latest`: subscribe from the latest data, this is the default behavior(version > 3.1.1.0); or `none`: can't subscribe without committed offset|
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false
diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md
index 5db4803089..7dcd025c58 100644
--- a/docs/zh/07-develop/07-tmq.md
+++ b/docs/zh/07-develop/07-tmq.md
@@ -348,7 +348,7 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 |
**必填项**。最大长度:192。
每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度:192。 |
-| `auto.offset.reset` | enum | 消费组订阅的初始位置 |
`earliest`: default;从头开始订阅;
`latest`: 仅从最新数据开始订阅;
`none`: 没有提交的 offset 无法订阅 |
+| `auto.offset.reset` | enum | 消费组订阅的初始位置 |
`earliest`: default(version <= 3.1.1.0);从头开始订阅;
`latest`: default(version > 3.1.1.0);仅从最新数据开始订阅;
`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 |
diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c
index e861bd4b92..43ce8a68f9 100644
--- a/source/client/src/clientTmq.c
+++ b/source/client/src/clientTmq.c
@@ -26,8 +26,7 @@
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
-
-#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0)
+#define DEFAULT_HEARTBEAT_INTERVAL 3000
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
@@ -63,8 +62,7 @@ struct tmq_conf_t {
int8_t resetOffset;
int8_t withTbName;
int8_t snapEnable;
- int32_t snapBatchSize;
- bool hbBgEnable;
+// int32_t snapBatchSize;
uint16_t port;
int32_t autoCommitInterval;
char* ip;
@@ -84,7 +82,6 @@ struct tmq_t {
int32_t autoCommitInterval;
int8_t resetOffsetCfg;
uint64_t consumerId;
- bool hbBgEnable;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
@@ -276,8 +273,7 @@ tmq_conf_t* tmq_conf_new() {
conf->withTbName = false;
conf->autoCommit = true;
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
- conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
- conf->hbBgEnable = true;
+ conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
return conf;
}
@@ -367,10 +363,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
- if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
- conf->snapBatchSize = taosStr2int64(value);
- return TMQ_CONF_OK;
- }
+// if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
+// conf->snapBatchSize = taosStr2int64(value);
+// return TMQ_CONF_OK;
+// }
// if (strcasecmp(key, "enable.heartbeat.background") == 0) {
// if (strcasecmp(value, "true") == 0) {
@@ -847,7 +843,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER:
tDeatroySMqHbReq(&req);
- taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
+ taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
taosReleaseRef(tmqMgmt.rsetId, refId);
}
@@ -1106,8 +1102,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset;
taosInitRWLatch(&pTmq->lock);
- pTmq->hbBgEnable = conf->hbBgEnable;
-
// assign consumerId
pTmq->consumerId = tGenIdPI64();
@@ -1131,19 +1125,16 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto _failed;
}
- if (pTmq->hbBgEnable) {
- int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
- *pRefId = pTmq->refId;
- pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
- }
+ int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
+ *pRefId = pTmq->refId;
+ pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer);
char buf[TSDB_OFFSET_LEN] = {0};
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
tFormatOffset(buf, tListLen(buf), &offset);
tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
- ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
- pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
- buf, pTmq->hbBgEnable);
+ ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
+ pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf);
return pTmq;
diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c
index b77424d4a5..d122a9b0b5 100644
--- a/source/dnode/vnode/src/tq/tqUtil.c
+++ b/source/dnode/vnode/src/tq/tqUtil.c
@@ -108,7 +108,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
if (pRequest->useSnapshot) {
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
consumerId, pHandle->subKey, vgId);
-
if (pHandle->fetchMeta) {
tqOffsetResetToMeta(pOffsetVal, 0);
} else {
diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim
index fe6ec04a20..4ef0c121f6 100644
--- a/tests/script/tsim/tmq/basic1.sim
+++ b/tests/script/tsim/tmq/basic1.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic1Of2Cons.sim b/tests/script/tsim/tmq/basic1Of2Cons.sim
index c12351cbe8..d2906ec875 100644
--- a/tests/script/tsim/tmq/basic1Of2Cons.sim
+++ b/tests/script/tsim/tmq/basic1Of2Cons.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic2.sim b/tests/script/tsim/tmq/basic2.sim
index 5c7528ea5d..4477101d0f 100644
--- a/tests/script/tsim/tmq/basic2.sim
+++ b/tests/script/tsim/tmq/basic2.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic2Of2Cons.sim b/tests/script/tsim/tmq/basic2Of2Cons.sim
index 23598c17a4..951a1d52fd 100644
--- a/tests/script/tsim/tmq/basic2Of2Cons.sim
+++ b/tests/script/tsim/tmq/basic2Of2Cons.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim
index 1223a94fa7..8cc447f0c7 100644
--- a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim
+++ b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic3.sim b/tests/script/tsim/tmq/basic3.sim
index 8bb34cefa2..da2bee4f6b 100644
--- a/tests/script/tsim/tmq/basic3.sim
+++ b/tests/script/tsim/tmq/basic3.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic3Of2Cons.sim b/tests/script/tsim/tmq/basic3Of2Cons.sim
index 75d762c44b..21d691bd9c 100644
--- a/tests/script/tsim/tmq/basic3Of2Cons.sim
+++ b/tests/script/tsim/tmq/basic3Of2Cons.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic4.sim b/tests/script/tsim/tmq/basic4.sim
index c72d8ff412..adeab58ff2 100644
--- a/tests/script/tsim/tmq/basic4.sim
+++ b/tests/script/tsim/tmq/basic4.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/basic4Of2Cons.sim b/tests/script/tsim/tmq/basic4Of2Cons.sim
index bb006a354c..186005b231 100644
--- a/tests/script/tsim/tmq/basic4Of2Cons.sim
+++ b/tests/script/tsim/tmq/basic4Of2Cons.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/snapshot.sim b/tests/script/tsim/tmq/snapshot.sim
index fbdaba7d28..c0194d98c8 100644
--- a/tests/script/tsim/tmq/snapshot.sim
+++ b/tests/script/tsim/tmq/snapshot.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/script/tsim/tmq/snapshot1.sim b/tests/script/tsim/tmq/snapshot1.sim
index 5349981cc7..6121692d6c 100644
--- a/tests/script/tsim/tmq/snapshot1.sim
+++ b/tests/script/tsim/tmq/snapshot1.sim
@@ -62,8 +62,8 @@ $keyList = $keyList . ,
$keyList = $keyList . enable.auto.commit:false
#$keyList = $keyList . ,
#$keyList = $keyList . auto.commit.interval.ms:6000
-#$keyList = $keyList . ,
-#$keyList = $keyList . auto.offset.reset:earliest
+$keyList = $keyList . ,
+$keyList = $keyList . auto.offset.reset:earliest
$keyList = $keyList . '
print ========== key list: $keyList
diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py
index ff7c70bcd2..0e9e8f989f 100644
--- a/tests/system-test/7-tmq/tmqParamsTest.py
+++ b/tests/system-test/7-tmq/tmqParamsTest.py
@@ -19,7 +19,7 @@ class TDTestCase:
self.wal_retention_period1 = 3600
self.wal_retention_period2 = 1
self.commit_value_list = ["true", "false"]
- self.offset_value_list = ["", "earliest", "latest", "none"]
+ self.offset_value_list = ["earliest", "latest", "none"]
self.tbname_value_list = ["true", "false"]
self.snapshot_value_list = ["false"]
@@ -92,7 +92,7 @@ class TDTestCase:
}
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
- consumer_ret = "earliest" if offset_value == "" else offset_value
+ consumer_ret = "latest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]}ms,reset:{consumer_ret}'
if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"]
diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c
index 5d4d73c448..ff89bb1f75 100644
--- a/utils/test/c/tmq_taosx_ci.c
+++ b/utils/test/c/tmq_taosx_ci.c
@@ -547,6 +547,7 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
+ tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (g_conf.snapShot) {
tmq_conf_set(conf, "experimental.snapshot.enable", "true");