diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index b6a1d9840d..a301d82c30 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -168,7 +168,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); - if(code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { + if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { goto end; } @@ -177,11 +177,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // lock taosWLockLatch(&pTq->lock); int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); - if (pOffset->version >= ver || dataRsp.rspOffset.version >= ver){ //check if there are data again to avoid lost data + if (pOffset->version >= ver || + dataRsp.rspOffset.version >= ver) { // check if there are data again to avoid lost data code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); goto end; - }else{ + } else { taosWUnLockLatch(&pTq->lock); } } @@ -196,6 +197,7 @@ end : { consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); tDeleteMqDataRsp(&dataRsp); return code; + } } static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,