Merge pull request #27135 from taosdata/fix/TD-31356

ignore delete res since res is empty
This commit is contained in:
Haojun Liao 2024-08-12 14:31:54 +08:00 committed by GitHub
commit 9e0783afc9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 32 additions and 13 deletions

View File

@ -2790,8 +2790,11 @@ char* getStreamOpName(uint16_t opType) {
} }
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) { void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) { if (!pBlock) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); qDebug("%s===stream===%s: Block is Null", taskIdStr, flag);
return;
} else if (pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type);
return; return;
} }
char* pBuf = NULL; char* pBuf = NULL;

View File

@ -1647,6 +1647,9 @@ bool hasPrimaryKeyCol(SStreamScanInfo* pInfo) { return pInfo->primaryKeyIndex !=
static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) { static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion, void* pVal) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion); SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, ts, ts, maxVersion);
if (!pPreRes || pPreRes->info.rows == 0) { if (!pPreRes || pPreRes->info.rows == 0) {
if (terrno != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
}
return 0; return 0;
} }
@ -1938,6 +1941,9 @@ static int32_t getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs,
printDataBlock(pPreRes, "pre res", taskIdStr); printDataBlock(pPreRes, "pre res", taskIdStr);
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
if (!pPreRes) { if (!pPreRes) {
if (terrno != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
}
goto _end; goto _end;
} }
code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows); code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows);
@ -3370,6 +3376,7 @@ FETCH_NEXT_BLOCK:
case STREAM_SCAN_FROM_DELETE_DATA: { case STREAM_SCAN_FROM_DELETE_DATA: {
code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA); code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_PARTITION_DELETE_DATA);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pUpdateRes->info.rows > 0) {
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); code = copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
@ -3377,14 +3384,23 @@ FETCH_NEXT_BLOCK:
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
(*ppRes) = pInfo->pDeleteDataRes; (*ppRes) = pInfo->pDeleteDataRes;
return code; return code;
}
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino);
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; } break;
case STREAM_SCAN_FROM_UPDATERES: { case STREAM_SCAN_FROM_UPDATERES: {
code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_CLEAR); code = generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes, STREAM_CLEAR);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pUpdateRes->info.rows > 0) {
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
(*ppRes) = pInfo->pUpdateRes; (*ppRes) = pInfo->pUpdateRes;
return code; return code;
}
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, lino);
blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; } break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: case STREAM_SCAN_FROM_DATAREADER_RANGE:
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {