fix:bugs in tmq_get_topic_assignment
This commit is contained in:
parent
4c9a0c67e1
commit
bdfeb32923
|
@ -2583,6 +2583,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
|
pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
|
||||||
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
|
pAssignment->end = pClientVg->offsetInfo.walVerEnd;
|
||||||
pAssignment->vgId = pClientVg->vgId;
|
pAssignment->vgId = pClientVg->vgId;
|
||||||
|
tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId,
|
||||||
|
pAssignment->vgId, pAssignment->currentOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (needFetch) {
|
if (needFetch) {
|
||||||
|
@ -2673,34 +2675,36 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
int32_t num = taosArrayGetSize(pCommon->pList);
|
int32_t num = taosArrayGetSize(pCommon->pList);
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
(*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
|
(*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
|
||||||
|
tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId,
|
||||||
|
(*assignment)[i].vgId, (*assignment)[i].currentOffset);
|
||||||
}
|
}
|
||||||
*numOfAssignment = num;
|
*numOfAssignment = num;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
// for (int32_t j = 0; j < (*numOfAssignment); ++j) {
|
||||||
tmq_topic_assignment* p = &(*assignment)[j];
|
// tmq_topic_assignment* p = &(*assignment)[j];
|
||||||
|
//
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
|
// for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
|
||||||
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
|
// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
|
||||||
if (pClientVg->vgId != p->vgId) {
|
// if (pClientVg->vgId != p->vgId) {
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
// SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
|
||||||
|
//
|
||||||
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
|
||||||
|
//
|
||||||
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
// char offsetBuf[80] = {0};
|
||||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
||||||
|
//
|
||||||
tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
|
// tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
|
||||||
|
//
|
||||||
pOffsetInfo->walVerBegin = p->begin;
|
// pOffsetInfo->walVerBegin = p->begin;
|
||||||
pOffsetInfo->walVerEnd = p->end;
|
// pOffsetInfo->walVerEnd = p->end;
|
||||||
pOffsetInfo->currentOffset.version = p->currentOffset;
|
// pOffsetInfo->currentOffset.version = p->currentOffset;
|
||||||
pOffsetInfo->committedOffset.version = p->currentOffset;
|
// pOffsetInfo->committedOffset.version = p->currentOffset;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
destroyCommonInfo(pCommon);
|
destroyCommonInfo(pCommon);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -571,6 +571,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
||||||
dataRsp.rspOffset.version = pOffset->val.version;
|
dataRsp.rspOffset.version = pOffset->val.version;
|
||||||
|
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from stroe:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
|
||||||
} else {
|
} else {
|
||||||
if (req.useSnapshot == true) {
|
if (req.useSnapshot == true) {
|
||||||
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
|
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
|
||||||
|
@ -581,14 +582,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
||||||
|
|
||||||
if (reqOffset.type == TMQ_OFFSET__LOG) {
|
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
|
||||||
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
|
|
||||||
if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
|
|
||||||
dataRsp.rspOffset.version = reqOffset.version;
|
|
||||||
} else {
|
|
||||||
dataRsp.rspOffset.version = currentVer; // return current consume offset value
|
|
||||||
}
|
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
|
|
||||||
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
|
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
dataRsp.rspOffset.version = ever;
|
dataRsp.rspOffset.version = ever;
|
||||||
|
|
Loading…
Reference in New Issue