fix:return offset stored if get assignment in init mode

This commit is contained in:
wangmm0220 2023-07-08 13:25:39 +08:00
parent 4436eb7e0f
commit f9f656b21a
1 changed files with 20 additions and 5 deletions

View File

@ -571,10 +571,26 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
if (reqOffset.type == TMQ_OFFSET__LOG) { if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = reqOffset.version; dataRsp.rspOffset.version = reqOffset.version;
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { } else if(reqOffset.type < 0){
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { if (pOffset != NULL) {
dataRsp.rspOffset.version = ever; if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.version = pOffset->val.version;
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
}else{
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever;
}
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
}
} else { } else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
reqOffset.type); reqOffset.type);
@ -582,7 +598,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);
return -1; return -1;
} }
tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
tDeleteMqDataRsp(&dataRsp); tDeleteMqDataRsp(&dataRsp);