Merge pull request #28907 from taosdata/fix/TS-5679
fix:[TS-5679] auto commit error
This commit is contained in:
commit
00d01ac867
|
@ -1872,13 +1872,17 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
|
||||
if (tmq->epTimer == NULL){
|
||||
tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(tmq->refId), tmqMgmt.timer);
|
||||
if (tmq->epTimer == NULL) {
|
||||
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
if (tmq->commitTimer == NULL){
|
||||
if (tmq->autoCommit && tmq->commitTimer == NULL){
|
||||
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, (void*)(tmq->refId), tmqMgmt.timer);
|
||||
}
|
||||
if (tmq->epTimer == NULL || tmq->commitTimer == NULL) {
|
||||
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
goto END;
|
||||
if (tmq->commitTimer == NULL) {
|
||||
code = TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
END:
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
[10/28 19:12:21.666563] SUCC: created database (db_sub)
|
||||
[10/28 19:12:21.694603] INFO: start creating 1000 table(s) with 8 thread(s)
|
||||
[10/28 19:12:21.823202] SUCC: Spent 0.1290 seconds to create 1000 table(s) with 8 thread(s) speed: 7752 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
||||
[10/28 19:12:22.127442] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 441047.79 records/second
|
||||
[10/28 19:12:22.128649] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 440895.33 records/second
|
||||
[10/28 19:12:22.129478] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 440151.69 records/second
|
||||
[10/28 19:12:22.133756] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 433268.05 records/second
|
||||
[10/28 19:12:22.135211] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 430329.63 records/second
|
||||
[10/28 19:12:22.137335] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 425800.08 records/second
|
||||
[10/28 19:12:22.138252] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 426330.15 records/second
|
||||
[10/28 19:12:22.141351] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 422778.64 records/second
|
||||
[10/28 19:12:22.141585] SUCC: Spent 0.311648 (real 0.289041) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3208748.33 (real 3459716.79) records/second
|
||||
[10/28 19:12:22.141590] SUCC: insert delay, min: 0.9600ms, avg: 2.3123ms, p90: 3.1790ms, p95: 3.5080ms, p99: 4.2230ms, max: 4.9040ms
|
||||
[10/28 19:28:50.798427] SUCC: created database (db_sub)
|
||||
[10/28 19:28:50.828326] INFO: start creating 1000 table(s) with 8 thread(s)
|
||||
[10/28 19:28:50.936429] SUCC: Spent 0.1080 seconds to create 1000 table(s) with 8 thread(s) speed: 9259 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
||||
[10/28 19:28:51.187235] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 539204.48 records/second
|
||||
[10/28 19:28:51.189941] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 532329.43 records/second
|
||||
[10/28 19:28:51.191551] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 530954.66 records/second
|
||||
[10/28 19:28:51.191858] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 529259.59 records/second
|
||||
[10/28 19:28:51.192459] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 530229.44 records/second
|
||||
[10/28 19:28:51.195372] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 522099.42 records/second
|
||||
[10/28 19:28:51.197727] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 516620.72 records/second
|
||||
[10/28 19:28:51.197883] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 517125.12 records/second
|
||||
[10/28 19:28:51.198123] SUCC: Spent 0.255536 (real 0.237135) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3913342.93 (real 4217007.19) records/second
|
||||
[10/28 19:28:51.198130] SUCC: insert delay, min: 0.9200ms, avg: 1.8971ms, p90: 2.6870ms, p95: 2.9520ms, p99: 3.5880ms, max: 4.0710ms
|
||||
[10/28 19:31:44.377691] SUCC: created database (db_sub)
|
||||
[10/28 19:31:44.392998] INFO: start creating 1000 table(s) with 8 thread(s)
|
||||
[10/28 19:31:44.696768] SUCC: Spent 0.3040 seconds to create 1000 table(s) with 8 thread(s) speed: 3289 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
||||
[10/28 19:31:45.126910] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 304775.47 records/second
|
||||
[10/28 19:31:45.131979] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 301117.75 records/second
|
||||
[10/28 19:31:45.135106] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 299854.39 records/second
|
||||
[10/28 19:31:45.135675] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 298322.24 records/second
|
||||
[10/28 19:31:45.137069] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 297733.89 records/second
|
||||
[10/28 19:31:45.137952] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 296900.13 records/second
|
||||
[10/28 19:31:45.138834] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 295170.54 records/second
|
||||
[10/28 19:31:45.145048] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 291966.71 records/second
|
||||
[10/28 19:31:45.145369] SUCC: Spent 0.442506 (real 0.419200) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2259856.36 (real 2385496.18) records/second
|
||||
[10/28 19:31:45.145377] SUCC: insert delay, min: 1.0400ms, avg: 3.3536ms, p90: 5.3120ms, p95: 7.9660ms, p99: 13.1570ms, max: 19.1410ms
|
||||
[10/28 19:44:19.873056] SUCC: created database (db_sub)
|
||||
[10/28 19:44:19.904701] INFO: start creating 1000 table(s) with 8 thread(s)
|
||||
[10/28 19:44:20.053846] SUCC: Spent 0.1490 seconds to create 1000 table(s) with 8 thread(s) speed: 6711 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
||||
[10/28 19:44:20.328698] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 485742.49 records/second
|
||||
[10/28 19:44:20.330777] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 481686.29 records/second
|
||||
[10/28 19:44:20.331290] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 480911.65 records/second
|
||||
[10/28 19:44:20.331665] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 481043.06 records/second
|
||||
[10/28 19:44:20.333451] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 477172.09 records/second
|
||||
[10/28 19:44:20.334745] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 475675.84 records/second
|
||||
[10/28 19:44:20.335056] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 474158.37 records/second
|
||||
[10/28 19:44:20.337919] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 470816.89 records/second
|
||||
[10/28 19:44:20.338144] SUCC: Spent 0.277921 (real 0.261310) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3598144.80 (real 3826872.30) records/second
|
||||
[10/28 19:44:20.338153] SUCC: insert delay, min: 0.9180ms, avg: 2.0905ms, p90: 2.6490ms, p95: 3.0620ms, p99: 4.1480ms, max: 4.7840ms
|
||||
[10/28 19:58:27.100989] SUCC: created database (db_sub)
|
||||
[10/28 19:58:27.115572] INFO: start creating 1000 table(s) with 8 thread(s)
|
||||
[10/28 19:58:27.362948] SUCC: Spent 0.2470 seconds to create 1000 table(s) with 8 thread(s) speed: 4049 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
||||
[10/28 19:58:27.807669] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 291891.03 records/second
|
||||
[10/28 19:58:27.818785] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 285413.54 records/second
|
||||
[10/28 19:58:27.819649] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 284193.61 records/second
|
||||
[10/28 19:58:27.819844] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 284352.64 records/second
|
||||
[10/28 19:58:27.820170] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 284576.63 records/second
|
||||
[10/28 19:58:27.821489] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 283781.33 records/second
|
||||
[10/28 19:58:27.822061] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 283112.24 records/second
|
||||
[10/28 19:58:27.823513] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 282730.59 records/second
|
||||
[10/28 19:58:27.823779] SUCC: Spent 0.455783 (real 0.438625) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 2194026.54 (real 2279851.81) records/second
|
||||
[10/28 19:58:27.823786] SUCC: insert delay, min: 0.9780ms, avg: 3.5090ms, p90: 5.5650ms, p95: 6.8600ms, p99: 10.6010ms, max: 13.4400ms
|
||||
[10/28 20:00:06.417182] SUCC: created database (db_sub)
|
||||
[10/28 20:00:06.448202] INFO: start creating 1000 table(s) with 8 thread(s)
|
||||
[10/28 20:00:06.596961] SUCC: Spent 0.1480 seconds to create 1000 table(s) with 8 thread(s) speed: 6757 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
||||
[10/28 20:00:06.895455] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 443978.76 records/second
|
||||
[10/28 20:00:06.896986] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 442549.94 records/second
|
||||
[10/28 20:00:06.897536] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 440927.99 records/second
|
||||
[10/28 20:00:06.898905] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 439131.15 records/second
|
||||
[10/28 20:00:06.899024] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 439628.46 records/second
|
||||
[10/28 20:00:06.901861] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 435197.37 records/second
|
||||
[10/28 20:00:06.902305] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 434812.86 records/second
|
||||
[10/28 20:00:06.904698] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 433406.26 records/second
|
||||
[10/28 20:00:06.904905] SUCC: Spent 0.301788 (real 0.284949) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3313584.37 (real 3509399.93) records/second
|
||||
[10/28 20:00:06.904912] SUCC: insert delay, min: 0.8770ms, avg: 2.2796ms, p90: 3.1340ms, p95: 3.6480ms, p99: 4.8280ms, max: 6.0880ms
|
||||
[10/28 20:05:34.756207] SUCC: created database (db_sub)
|
||||
[10/28 20:05:34.784793] INFO: start creating 1000 table(s) with 8 thread(s)
|
||||
[10/28 20:05:34.927068] SUCC: Spent 0.1430 seconds to create 1000 table(s) with 8 thread(s) speed: 6993 tables/s, already exist 0 table(s), actual 1000 table(s) pre created, 0 table(s) will be auto created
|
||||
[10/28 20:05:35.213741] SUCC: thread[4] progressive mode, completed total inserted rows: 125000, 466952.82 records/second
|
||||
[10/28 20:05:35.215403] SUCC: thread[3] progressive mode, completed total inserted rows: 125000, 463804.68 records/second
|
||||
[10/28 20:05:35.221132] SUCC: thread[2] progressive mode, completed total inserted rows: 125000, 453322.31 records/second
|
||||
[10/28 20:05:35.221224] SUCC: thread[1] progressive mode, completed total inserted rows: 125000, 453671.11 records/second
|
||||
[10/28 20:05:35.222003] SUCC: thread[0] progressive mode, completed total inserted rows: 125000, 452641.07 records/second
|
||||
[10/28 20:05:35.222536] SUCC: thread[5] progressive mode, completed total inserted rows: 125000, 451796.89 records/second
|
||||
[10/28 20:05:35.223663] SUCC: thread[7] progressive mode, completed total inserted rows: 125000, 449643.52 records/second
|
||||
[10/28 20:05:35.225246] SUCC: thread[6] progressive mode, completed total inserted rows: 125000, 447768.68 records/second
|
||||
[10/28 20:05:35.225659] SUCC: Spent 0.290871 (real 0.274808) seconds to insert rows: 1000000 with 8 thread(s) into db_sub 3437950.16 (real 3638904.25) records/second
|
||||
[10/28 20:05:35.225666] SUCC: insert delay, min: 0.9360ms, avg: 2.1985ms, p90: 2.9290ms, p95: 3.4580ms, p99: 4.6030ms, max: 6.2660ms
|
|
@ -45,6 +45,11 @@ class TDTestCase:
|
|||
tdLog.exit("tmq_offset_test error!")
|
||||
else:
|
||||
buildPath = tdCom.getBuildPath()
|
||||
cmdStr0 = '%s/build/bin/tmq_offset_test 5679'%(buildPath)
|
||||
tdLog.info(cmdStr0)
|
||||
if os.system(cmdStr0) != 0:
|
||||
tdLog.exit(cmdStr0)
|
||||
|
||||
cmdStr1 = '%s/build/bin/taosBenchmark -i 50 -B 1 -t 1000 -n 100000 -y &'%(buildPath)
|
||||
tdLog.info(cmdStr1)
|
||||
os.system(cmdStr1)
|
||||
|
|
|
@ -80,6 +80,77 @@ int buildData(TAOS* pConn){
|
|||
return 0;
|
||||
}
|
||||
|
||||
void test_ts5679(TAOS* pConn){
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t_5679");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists db_ts5679");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create database if not exists db_ts5679 vgroups 1 wal_retention_period 3600");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t_5679 as database db_ts5679");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "use db_ts5679");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn,"CREATE TABLE `t1` (`ts` TIMESTAMP, `voltage` INT)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into t1 values(now, 1)");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_id_2");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "t_5679");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
while(1){
|
||||
pRes = tmq_consumer_poll(tmq, 1000);
|
||||
if (pRes == NULL){
|
||||
break;
|
||||
}
|
||||
taosSsleep(3);
|
||||
}
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
int32_t numOfAssign = 0;
|
||||
int32_t code = tmq_get_topic_assignment(tmq, "t_5679", &pAssign, &numOfAssign);
|
||||
ASSERT (code == 0);
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
int64_t committed = tmq_committed(tmq, "t_5679", pAssign[i].vgId);
|
||||
printf("committed offset:%"PRId64"\n", committed);
|
||||
ASSERT(committed == TSDB_CODE_TMQ_NO_COMMITTED);
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
|
||||
void test_offset(TAOS* pConn){
|
||||
if(buildData(pConn) != 0){
|
||||
ASSERT(0);
|
||||
|
@ -304,8 +375,13 @@ void test_ts3756(TAOS* pConn){
|
|||
|
||||
int main(int argc, char* argv[]) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
test_offset(pConn);
|
||||
test_ts3756(pConn);
|
||||
if (argc == 2) {
|
||||
test_ts5679(pConn);
|
||||
}else{
|
||||
test_offset(pConn);
|
||||
test_ts3756(pConn);
|
||||
}
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue