fix(stream): set correct ssdatablock for tqRetrieveDataBlock

This commit is contained in:
Haojun Liao 2023-05-05 13:41:25 +08:00
parent 32b4642dac
commit 9b5c205498
1 changed files with 6 additions and 10 deletions

View File

@ -1649,14 +1649,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextBlockImpl(pInfo->tqReader)) { while (tqNextBlockImpl(pInfo->tqReader)) {
SSDataBlock block = {0}; int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue; continue;
} }
setBlockIntoRes(pInfo, &block, true); setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, true);
if (pBlockInfo->rows > 0) { if (pBlockInfo->rows > 0) {
return pInfo->pRes; return pInfo->pRes;
@ -2075,14 +2073,12 @@ FETCH_NEXT_BLOCK:
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
while (tqNextBlockImpl(pInfo->tqReader)) { while (tqNextBlockImpl(pInfo->tqReader)) {
SSDataBlock block = {0}; int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
int32_t code = tqRetrieveDataBlock(&block, pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
continue; continue;
} }
setBlockIntoRes(pInfo, &block, false); setBlockIntoRes(pInfo, pInfo->tqReader->pResBlock, false);
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId, if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
pInfo->pRes->info.version)) { pInfo->pRes->info.version)) {