fix:seek failed in initilized statue

This commit is contained in:
wangmm0220 2023-07-09 15:31:19 +08:00
parent 0929be690d
commit ee50ed4847
1 changed files with 25 additions and 23 deletions

View File

@ -2709,16 +2709,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; // pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); // char offsetBuf[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end; pOffsetInfo->walVerEnd = p->end;
pOffsetInfo->currentOffset.version = p->currentOffset; // pOffsetInfo->currentOffset.version = p->currentOffset;
pOffsetInfo->committedOffset.version = p->currentOffset; // pOffsetInfo->committedOffset.version = p->currentOffset;
} }
} }
} }
@ -2779,25 +2780,26 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
int32_t type = pOffsetInfo->currentOffset.type; int32_t type = pOffsetInfo->currentOffset.type;
if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { // if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); // tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
taosWUnLockLatch(&tmq->lock); // taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA; // return TSDB_CODE_INVALID_PARA;
} // }
//
if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { // if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", // tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); // tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
taosWUnLockLatch(&tmq->lock); // taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA; // return TSDB_CODE_INVALID_PARA;
} // }
// update the offset, and then commit to vnode // update the offset, and then commit to vnode
if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { // if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
pOffsetInfo->currentOffset.version = offset - 1; pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->committedOffset.version = INT64_MIN; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
pVg->seekUpdated = true; pOffsetInfo->committedOffset.version = INT64_MIN;
} pVg->seekUpdated = true;
// }
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));