ignore delete res since res is empty

This commit is contained in:
54liuyao 2024-08-09 17:36:19 +08:00
parent 75e9c027a6
commit 5d4e30de3f
2 changed files with 32 additions and 13 deletions

View File

@ -2790,8 +2790,11 @@ char* getStreamOpName(uint16_t opType) {
}
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
if (!pBlock) {
qDebug("%s===stream===%s: Block is Null", taskIdStr, flag);
return;
} else if (pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
return;
}
char* pBuf = NULL;

View File

@ -1647,6 +1647,9 @@ bool hasPrimaryKeyCol(SStreamScanInfo* pInfo) { return pInfo->primaryKeyIndex !=
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
if (!pPreRes || pPreRes->info.rows == 0) {
if (terrno != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
}
return 0;
}
@ -1938,6 +1941,9 @@ static int32_t getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs,
printDataBlock(pPreRes, "pre res", taskIdStr);
blockDataCleanup(pBlock);
if (!pPreRes) {
if (terrno != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
}
goto _end;
}
code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows);
@ -3370,21 +3376,31 @@ FETCH_NEXT_BLOCK:
case STREAM_SCAN_FROM_DELETE_DATA: {
code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA);
QUERY_CHECK_CODE(code, lino, _end);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
(*ppRes) = pInfo->pDeleteDataRes;
return code;
if (pInfo->pUpdateRes->info.rows > 0) {
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
(*ppRes) = pInfo->pDeleteDataRes;
return code;
}
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino);
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
case STREAM_SCAN_FROM_UPDATERES: {
code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_CLEAR);
QUERY_CHECK_CODE(code, lino, _end);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
(*ppRes) = pInfo->pUpdateRes;
return code;
if (pInfo->pUpdateRes->info.rows > 0) {
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
(*ppRes) = pInfo->pUpdateRes;
return code;
}
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino);
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE:
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {