diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 616c2593cb..b732fccd8e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2790,8 +2790,11 @@ char* getStreamOpName(uint16_t opType) { } void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) { - if (!pBlock || pBlock->info.rows == 0) { - qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); + if (!pBlock) { + qDebug("%s===stream===%s: Block is Null", taskIdStr, flag); + return; + } else if (pBlock->info.rows == 0) { + qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type); return; } char* pBuf = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e15dcf806a..813d984528 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1647,6 +1647,9 @@ bool hasPrimaryKeyCol(SStreamScanInfo* pInfo) { return pInfo->primaryKeyIndex != static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) { SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion); if (!pPreRes || pPreRes->info.rows == 0) { + if (terrno != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + } return 0; } @@ -1938,6 +1941,9 @@ static int32_t getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, printDataBlock(pPreRes, "pre res", taskIdStr); blockDataCleanup(pBlock); if (!pPreRes) { + if (terrno != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + } goto _end; } code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows); @@ -3370,21 +3376,31 @@ FETCH_NEXT_BLOCK: case STREAM_SCAN_FROM_DELETE_DATA: { code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA); QUERY_CHECK_CODE(code, lino, _end); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); - QUERY_CHECK_CODE(code, lino, _end); - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - (*ppRes) = pInfo->pDeleteDataRes; - return code; + if (pInfo->pUpdateRes->info.rows > 0) { + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + QUERY_CHECK_CODE(code, lino, _end); + pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; + (*ppRes) = pInfo->pDeleteDataRes; + return code; + } + qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino); + blockDataCleanup(pInfo->pUpdateDataRes); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } break; case STREAM_SCAN_FROM_UPDATERES: { code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_CLEAR); QUERY_CHECK_CODE(code, lino, _end); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - (*ppRes) = pInfo->pUpdateRes; - return code; + if (pInfo->pUpdateRes->info.rows > 0) { + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + (*ppRes) = pInfo->pUpdateRes; + return code; + } + qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino); + blockDataCleanup(pInfo->pUpdateDataRes); + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {