enh:[TS-5441] change poll flag to consumer for multi consumers

This commit is contained in:
wangmm0220 2024-10-11 14:26:45 +08:00
parent 74df5d4711
commit f8337a3263
2 changed files with 14 additions and 5 deletions

View File

@ -285,6 +285,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py

View File

@ -74,10 +74,11 @@ static void callFunc(int i, tmq_t* tmq, tmq_list_t* topics) {
break;
}
case 3:
taos_free_result(tmqmessage);
tmqmessage = tmq_consumer_poll(tmq, 5000);
break;
case 4:
tmq_consumer_close(tmq);
// tmq_consumer_close(tmq);
break;
case 5:
tmq_commit_sync(tmq, NULL);
@ -144,13 +145,20 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmqmessage = tmq_consumer_poll(tmq, 5000);
if (tmqmessage) {
printf("poll message\n");
while(cnt < 1000){
callFunc(taosRand()%21, tmq, topics);
while(cnt < 100){
uint32_t i = taosRand()%21;
callFunc(i, tmq, topics);
callFunc(i, tmq, topics);
cnt++;
}
} else {
break;
while(cnt < 300){
uint32_t i = taosRand()%21;
callFunc(i, tmq, topics);
cnt++;
}
taos_free_result(tmqmessage);
}
break;
}
code = tmq_consumer_close(tmq);