feat:add committed & position & commite_offset interface

This commit is contained in:
wangmm0220 2023-07-20 00:07:04 +08:00
parent 9378a0ef83
commit e1c4cca33d
2 changed files with 5 additions and 1 deletions

View File

@ -3106,7 +3106,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
return TSDB_CODE_TMQ_SNAPSHOT_ERROR; return TSDB_CODE_TMQ_SNAPSHOT_ERROR;
} }
code = checkWalRange(pOffsetInfo, -1); code = checkWalRange(pOffsetInfo, offset);
if (code != 0) { if (code != 0) {
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
return code; return code;

View File

@ -1123,6 +1123,9 @@ TEST(clientCase, tmq_commit) {
for(int i = 0; i < numOfAssign; i++){ for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed);
int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId); int64_t position = tmq_position(tmq, topicName, pAssign[i].vgId);
printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position); printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1); tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1);
@ -1317,6 +1320,7 @@ TEST(clientCase, td_25129) {
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
} }
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq); tmq_consumer_close(tmq);
taos_close(pConn); taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);