fix:wal error
This commit is contained in:
parent
1b9387b683
commit
ae09d1d031
|
@ -176,7 +176,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
|
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) {
|
||||||
// lock
|
// lock
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
int64_t ver = walGetCommittedVer(pHandle->pWalReader->pWal);
|
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
|
||||||
if (pOffset->version >= ver || dataRsp.rspOffset.version >= ver){ //check if there are data again to avoid lost data
|
if (pOffset->version >= ver || dataRsp.rspOffset.version >= ver){ //check if there are data again to avoid lost data
|
||||||
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
@ -424,4 +424,4 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue