feat:[TD-30883]send data if scan wal too log
This commit is contained in:
parent
36e517b130
commit
a4ebe75b4c
|
@ -101,6 +101,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
||||||
TSDB_CHECK_CODE(code, line, END);
|
TSDB_CHECK_CODE(code, line, END);
|
||||||
|
|
||||||
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
|
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
|
||||||
|
uint64_t st = taosGetTimestampMs();
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
|
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
|
||||||
|
@ -160,7 +161,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
||||||
|
|
||||||
pRsp->common.blockNum++;
|
pRsp->common.blockNum++;
|
||||||
totalRows += pDataBlock->info.rows;
|
totalRows += pDataBlock->info.rows;
|
||||||
if (totalRows >= tmqRowSize) {
|
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,7 +104,7 @@ class TDTestCase:
|
||||||
stop_flag = 0
|
stop_flag = 0
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
res = consumer.poll(1)
|
res = consumer.poll(3)
|
||||||
tdSql.query('show consumers;')
|
tdSql.query('show consumers;')
|
||||||
consumer_info = tdSql.queryResult[0][-1]
|
consumer_info = tdSql.queryResult[0][-1]
|
||||||
if offset_value == "latest":
|
if offset_value == "latest":
|
||||||
|
|
Loading…
Reference in New Issue