fix:set firset version to reqOffset of response
This commit is contained in:
parent
574010e067
commit
8dd7f36993
|
@ -2276,7 +2276,7 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
|
||||||
return pRspObj->rsp.reqOffset.version;
|
return pRspObj->rsp.reqOffset.version;
|
||||||
}
|
}
|
||||||
} else{
|
} else{
|
||||||
tscError("invalid tmqtype:%d", *(int8_t*)res);
|
tscError("invalid tmq type:%d", *(int8_t*)res);
|
||||||
}
|
}
|
||||||
|
|
||||||
// data from tsdb, no valid offset info
|
// data from tsdb, no valid offset info
|
||||||
|
|
|
@ -171,6 +171,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, pRequest);
|
tqInitDataRsp(&dataRsp, pRequest);
|
||||||
|
dataRsp.reqOffset.type = pOffset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset
|
||||||
|
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
|
@ -191,7 +192,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
}
|
||||||
setRequestVersion(pOffset, pOffset->version);
|
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
|
||||||
|
|
||||||
end : {
|
end : {
|
||||||
|
|
Loading…
Reference in New Issue