fix:[TS-5679] auto commit error
This commit is contained in:
parent
062a8a2831
commit
e11adb498b
|
@ -1873,7 +1873,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
if (tmq->epTimer == NULL){
|
if (tmq->epTimer == NULL){
|
||||||
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
|
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
|
||||||
}
|
}
|
||||||
if (tmq->commitTimer == NULL){
|
if (tmq->autoCommit && tmq->commitTimer == NULL){
|
||||||
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
|
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
|
||||||
}
|
}
|
||||||
if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
|
if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
|
||||||
|
|
|
@ -45,6 +45,11 @@ class TDTestCase:
|
||||||
tdLog.exit("tmq_offset_test error!")
|
tdLog.exit("tmq_offset_test error!")
|
||||||
else:
|
else:
|
||||||
buildPath = tdCom.getBuildPath()
|
buildPath = tdCom.getBuildPath()
|
||||||
|
cmdStr0 = '%s/build/bin/tmq_offset_test 5679'%(buildPath)
|
||||||
|
tdLog.info(cmdStr0)
|
||||||
|
if os.system(cmdStr0) != 0:
|
||||||
|
tdLog.exit(cmdStr0)
|
||||||
|
|
||||||
cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
|
cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
|
||||||
tdLog.info(cmdStr1)
|
tdLog.info(cmdStr1)
|
||||||
os.system(cmdStr1)
|
os.system(cmdStr1)
|
||||||
|
|
|
@ -80,6 +80,77 @@ int buildData(TAOS* pConn){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void test_ts5679(TAOS* pConn){
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t_5679");
|
||||||
|
ASSERT(taos_errno(pRes) == 0);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "drop database if exists db_ts5679");
|
||||||
|
ASSERT(taos_errno(pRes) == 0);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create database if not exists db_ts5679 vgroups 1 wal_retention_period 3600");
|
||||||
|
ASSERT(taos_errno(pRes) == 0);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create topic t_5679 as database db_ts5679");
|
||||||
|
ASSERT(taos_errno(pRes) == 0);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "use db_ts5679");
|
||||||
|
ASSERT(taos_errno(pRes) == 0);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
|
||||||
|
ASSERT(taos_errno(pRes) == 0);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "insert into t1 values(now, 1)");
|
||||||
|
ASSERT(taos_errno(pRes) == 0);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
|
||||||
|
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||||
|
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||||
|
tmq_conf_set(conf, "group.id", "group_id_2");
|
||||||
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||||
|
|
||||||
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
|
||||||
|
// 创建订阅 topics 列表
|
||||||
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
|
tmq_list_append(topicList, "t_5679");
|
||||||
|
|
||||||
|
// 启动订阅
|
||||||
|
tmq_subscribe(tmq, topicList);
|
||||||
|
tmq_list_destroy(topicList);
|
||||||
|
|
||||||
|
while(1){
|
||||||
|
pRes = tmq_consumer_poll(tmq, 1000);
|
||||||
|
if (pRes == NULL){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosSsleep(3);
|
||||||
|
}
|
||||||
|
tmq_topic_assignment* pAssign = NULL;
|
||||||
|
int32_t numOfAssign = 0;
|
||||||
|
int32_t code = tmq_get_topic_assignment(tmq, "t_5679", &pAssign, &numOfAssign);
|
||||||
|
ASSERT (code == 0);
|
||||||
|
|
||||||
|
for(int i = 0; i < numOfAssign; i++){
|
||||||
|
int64_t committed = tmq_committed(tmq, "t_5679", pAssign[i].vgId);
|
||||||
|
printf("committed offset:%"PRId64"\n", committed);
|
||||||
|
ASSERT(committed == TSDB_CODE_TMQ_NO_COMMITTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
}
|
||||||
|
|
||||||
void test_offset(TAOS* pConn){
|
void test_offset(TAOS* pConn){
|
||||||
if(buildData(pConn) != 0){
|
if(buildData(pConn) != 0){
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -304,8 +375,13 @@ void test_ts3756(TAOS* pConn){
|
||||||
|
|
||||||
int main(int argc, char* argv[]) {
|
int main(int argc, char* argv[]) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
if (argc == 2) {
|
||||||
|
test_ts5679(pConn);
|
||||||
|
}else{
|
||||||
test_offset(pConn);
|
test_offset(pConn);
|
||||||
test_ts3756(pConn);
|
test_ts3756(pConn);
|
||||||
|
}
|
||||||
|
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue