fix(tmq): rsp error offset
This commit is contained in:
parent
e74187248b
commit
94c3eb29d2
|
@ -514,6 +514,8 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
|
||||||
pParamSet->userParam = userParam;
|
pParamSet->userParam = userParam;
|
||||||
tsem_init(&pParamSet->rspSem, 0, 0);
|
tsem_init(&pParamSet->rspSem, 0, 0);
|
||||||
|
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (strcmp(pTopic->topicName, topic) != 0) continue;
|
if (strcmp(pTopic->topicName, topic) != 0) continue;
|
||||||
|
@ -530,8 +532,6 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = -1;
|
|
||||||
|
|
||||||
HANDLE_RSP:
|
HANDLE_RSP:
|
||||||
if (pParamSet->totalRspNum == 0) {
|
if (pParamSet->totalRspNum == 0) {
|
||||||
tsem_destroy(&pParamSet->rspSem);
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
|
|
|
@ -332,7 +332,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) {
|
||||||
// TODO add push mgr
|
// TODO add push mgr
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer - 1);
|
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
||||||
|
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
@ -354,6 +355,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO continue scan until meeting batch requirement
|
// TODO continue scan until meeting batch requirement
|
||||||
if (dataRsp.blockNum > 0 /* threshold */) {
|
if (dataRsp.blockNum > 0 /* threshold */) {
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer);
|
||||||
|
ASSERT(dataRsp.rspOffset.version >= dataRsp.reqOffset.version);
|
||||||
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
Loading…
Reference in New Issue