feat:add committed & position & commite_offset interface

This commit is contained in:
wangmm0220 2023-07-19 19:14:18 +08:00
parent d7d81d82a0
commit 49c87a7cf6
1 changed files with 10 additions and 8 deletions

View File

@ -1183,9 +1183,10 @@ TEST(clientCase, td_25129) {
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
char topicName[128] = "tp";
// 创建订阅 topics 列表 // 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp"); tmq_list_append(topicList, topicName);
// 启动订阅 // 启动订阅
tmq_subscribe(tmq, topicList); tmq_subscribe(tmq, topicList);
@ -1203,7 +1204,7 @@ TEST(clientCase, td_25129) {
tmq_topic_assignment* pAssign = NULL; tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0; int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); int32_t code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) { if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code)); printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
@ -1220,7 +1221,7 @@ TEST(clientCase, td_25129) {
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4); // tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) { if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code)); printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
@ -1236,7 +1237,7 @@ TEST(clientCase, td_25129) {
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) { if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code)); printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
@ -1266,7 +1267,7 @@ TEST(clientCase, td_25129) {
printSubResults(pRes, &totalRows); printSubResults(pRes, &totalRows);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); code = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssign);
if (code != 0) { if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code)); printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
@ -1280,10 +1281,11 @@ TEST(clientCase, td_25129) {
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
} }
} else { } else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); for(int i = 0; i < numOfAssign; i++) {
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); tmq_offset_seek(tmq, topicName, pAssign[i].vgId, pAssign[i].currentOffset);
}
tmq_commit_sync(tmq, pRes); tmq_commit_sync(tmq, pRes);
continue; break;
} }
// tmq_commit_sync(tmq, pRes); // tmq_commit_sync(tmq, pRes);