enh: refactor some asserts in doQueueScan
This commit is contained in:
parent
bcf6d5b641
commit
2a08e2a549
|
@ -1573,7 +1573,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
qError("submit msg messed up when initing stream submit block %p", pSubmit);
|
||||
pInfo->tqReader->pMsg = NULL;
|
||||
pTaskInfo->streamInfo.pReq = NULL;
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1628,11 +1628,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||
while (1) {
|
||||
SFetchRet ret = {0};
|
||||
tqNextBlock(pInfo->tqReader, &ret);
|
||||
if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
|
||||
qError("failed to get next log block since %s", terrstr());
|
||||
return NULL;
|
||||
}
|
||||
if (ret.fetchType == FETCH_TYPE__DATA) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
}
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
pOperator->status = OP_EXEC_RECV;
|
||||
|
@ -1640,7 +1643,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
return pInfo->pRes;
|
||||
}
|
||||
} else if (ret.fetchType == FETCH_TYPE__META) {
|
||||
ASSERT(0);
|
||||
qError("unexpected ret.fetchType:%d", ret.fetchType);
|
||||
return NULL;
|
||||
// pTaskInfo->streamInfo.lastStatus = ret.offset;
|
||||
// pTaskInfo->streamInfo.metaBlk = ret.meta;
|
||||
// return NULL;
|
||||
|
@ -1667,7 +1671,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
#endif
|
||||
} else {
|
||||
ASSERT(0);
|
||||
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue