fix(tmq): update the local offset when retrieving offset from vnode.
This commit is contained in:
parent
b16f26a12f
commit
c6e62d7fb8
|
@ -2517,6 +2517,31 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
*numOfAssignment = num;
|
*numOfAssignment = num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||||
|
tmq_topic_assignment* p = &(*assignment)[j];
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
|
||||||
|
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
|
||||||
|
if (pClientVg->vgId != p->vgId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
||||||
|
|
||||||
|
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
||||||
|
|
||||||
|
char offsetBuf[80] = {0};
|
||||||
|
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
||||||
|
|
||||||
|
tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
|
||||||
|
|
||||||
|
pOffsetInfo->walVerBegin = p->begin;
|
||||||
|
pOffsetInfo->walVerEnd = p->end;
|
||||||
|
pOffsetInfo->currentOffset.version = p->currentOffset;
|
||||||
|
pOffsetInfo->committedOffset.version = p->currentOffset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
destroyCommonInfo(pCommon);
|
destroyCommonInfo(pCommon);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1122,6 +1122,8 @@ TEST(clientCase, sub_tb_test) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||||
if (pRes != NULL) {
|
if (pRes != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue