Merge pull request #21260 from taosdata/feature/3_liaohj

fix(tmq): update the local offset when retrieving offset from vnode.
This commit is contained in:
Haojun Liao 2023-05-11 23:12:33 +08:00 committed by GitHub
commit 26f94ecd60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 2 deletions

View File

@ -2517,6 +2517,31 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
*numOfAssignment = num; *numOfAssignment = num;
} }
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
tmq_topic_assignment* p = &(*assignment)[j];
for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId != p->vgId) {
continue;
}
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
char offsetBuf[80] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end;
pOffsetInfo->currentOffset.version = p->currentOffset;
pOffsetInfo->committedOffset.version = p->currentOffset;
}
}
destroyCommonInfo(pCommon); destroyCommonInfo(pCommon);
return code; return code;
} else { } else {

View File

@ -1122,6 +1122,8 @@ TEST(clientCase, sub_tb_test) {
return; return;
} }
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0);
while (1) { while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes != NULL) { if (pRes != NULL) {

View File

@ -301,8 +301,12 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
} }
// save the new offset value // save the new offset value
tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, if (pSavedOffset != NULL) {
pSavedOffset->val.version); tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
pSavedOffset->val.version);
} else {
tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version);
}
if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version); tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);