fix:error in TD-23218 & remove useless logic
This commit is contained in:
parent
dd4cf0597b
commit
6fae38619d
|
@ -453,15 +453,6 @@ static int32_t processSubColumn(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||||
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
|
||||||
|
|
||||||
// NOTE: this pHandle->consumerId may have been changed already.
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
|
||||||
", ts:%" PRId64", reqId:0x%"PRIx64,
|
|
||||||
pRequest->consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
|
||||||
dataRsp.rspOffset.ts, pRequest->reqId);
|
|
||||||
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -357,11 +357,10 @@ bool tqNextDataBlock2(STqReader* pReader) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
|
|
||||||
pReader->msg2.ver, pReader->nextBlk);
|
|
||||||
|
|
||||||
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||||
while (pReader->nextBlk < blockSz) {
|
while (pReader->nextBlk < blockSz) {
|
||||||
|
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
|
||||||
|
pReader->msg2.ver, pReader->nextBlk);
|
||||||
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
|
||||||
if (pReader->tbIdHash == NULL) return true;
|
if (pReader->tbIdHash == NULL) return true;
|
||||||
|
|
||||||
|
|
|
@ -1628,7 +1628,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SFetchRet ret = {0};
|
SFetchRet ret = {0};
|
||||||
tqNextBlock(pInfo->tqReader, &ret);
|
tqNextBlock(pInfo->tqReader, &ret);
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1);
|
||||||
|
|
||||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||||
qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
|
qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
|
||||||
|
|
Loading…
Reference in New Issue