fix: fix syntax error.
This commit is contained in:
parent
b5a7ef9a7e
commit
5fdfe98696
|
@ -168,7 +168,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
|
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
if(code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,11 +177,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
// lock
|
// lock
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
|
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
|
||||||
if (pOffset->version >= ver || dataRsp.rspOffset.version >= ver){ //check if there are data again to avoid lost data
|
if (pOffset->version >= ver ||
|
||||||
|
dataRsp.rspOffset.version >= ver) { // check if there are data again to avoid lost data
|
||||||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
goto end;
|
goto end;
|
||||||
}else{
|
} else {
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -196,6 +197,7 @@ end : {
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
|
||||||
tDeleteMqDataRsp(&dataRsp);
|
tDeleteMqDataRsp(&dataRsp);
|
||||||
return code;
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
|
|
Loading…
Reference in New Issue