Merge pull request #20018 from taosdata/FIX/TS-2656-main
fix: keep processing on tqNextBlock failure
This commit is contained in:
commit
6bf83f06ae
|
@ -1630,13 +1630,10 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
SFetchRet ret = {0};
|
SFetchRet ret = {0};
|
||||||
if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
|
if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
|
||||||
qError("failed to get next log block since %s", terrstr());
|
qError("failed to get next log block since %s", terrstr());
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
|
setBlockIntoRes(pInfo, &ret.data, true);
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
pOperator->status = OP_EXEC_RECV;
|
pOperator->status = OP_EXEC_RECV;
|
||||||
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
|
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
|
||||||
|
@ -1644,7 +1641,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||||
qError("unexpected ret.fetchType:%d", ret.fetchType);
|
qError("unexpected ret.fetchType:%d", ret.fetchType);
|
||||||
return NULL;
|
continue;
|
||||||
// pTaskInfo->streamInfo.lastStatus = ret.offset;
|
// pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||||
// pTaskInfo->streamInfo.metaBlk = ret.meta;
|
// pTaskInfo->streamInfo.metaBlk = ret.meta;
|
||||||
// return NULL;
|
// return NULL;
|
||||||
|
|
Loading…
Reference in New Issue