fix:cancel the limit time for consume
This commit is contained in:
parent
4f20359f43
commit
7f2a9a7262
|
@ -366,7 +366,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
SWalReader* pWalReader = pReader->pWalReader;
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
|
|
||||||
uint64_t st = taosGetTimestampMs();
|
// uint64_t st = taosGetTimestampMs();
|
||||||
while (1) {
|
while (1) {
|
||||||
SArray* pBlockList = pReader->submit.aSubmitTbData;
|
SArray* pBlockList = pReader->submit.aSubmitTbData;
|
||||||
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
|
||||||
|
@ -441,9 +441,9 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
|
||||||
|
|
||||||
pReader->msg.msgStr = NULL;
|
pReader->msg.msgStr = NULL;
|
||||||
|
|
||||||
if(taosGetTimestampMs() - st > 5){
|
// if(taosGetTimestampMs() - st > 5){
|
||||||
return false;
|
// return false;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -207,7 +207,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
walReaderVerifyOffset(pHandle->pWalReader, offset);
|
||||||
int64_t fetchVer = offset->version;
|
int64_t fetchVer = offset->version;
|
||||||
|
|
||||||
uint64_t st = taosGetTimestampMs();
|
// uint64_t st = taosGetTimestampMs();
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
|
@ -254,7 +254,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) {
|
// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) {
|
||||||
|
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
goto end;
|
goto end;
|
||||||
|
|
Loading…
Reference in New Issue