Merge pull request #28153 from taosdata/fix/TD-32353-3.0

fix(query)[TD-32353]. Fix error handling during tag scan
This commit is contained in:
Pan Wei 2024-09-27 15:20:51 +08:00 committed by GitHub
commit 1be93844ef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 7 additions and 8 deletions

View File

@ -4294,13 +4294,13 @@ _error:
return code; return code;
} }
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr, static int32_t doTagScanOneTable(SOperatorInfo* pOperator, SSDataBlock* pRes, SMetaReader* mr, SStorageAPI* pAPI) {
SStorageAPI* pAPI) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STagScanInfo* pInfo = pOperator->info; STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
int32_t count = pRes->info.rows;
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
if (!item) { if (!item) {
@ -4360,6 +4360,8 @@ _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
pTaskInfo->code = code; pTaskInfo->code = code;
} else {
pRes->info.rows++;
} }
return code; return code;
@ -4715,26 +4717,23 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
return code; return code;
} }
int32_t count = 0;
SMetaReader mr = {0}; SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
pRes->info.rows = 0;
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { while (pInfo->curPos < size && pRes->info.rows < pOperator->resultInfo.capacity) {
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI); code = doTagScanOneTable(pOperator, pRes, &mr, &pTaskInfo->storageAPI);
if (code != TSDB_CODE_OUT_OF_MEMORY) { if (code != TSDB_CODE_OUT_OF_MEMORY) {
// ignore other error // ignore other error
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
} }
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
++count;
if (++pInfo->curPos >= size) { if (++pInfo->curPos >= size) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
} }
} }
pRes->info.rows = count;
pAPI->metaReaderFn.clearReader(&mr); pAPI->metaReaderFn.clearReader(&mr);
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo); bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
if (bLimitReached) { if (bLimitReached) {