fix(query)[TD-32353]. Fix error handling during tag scan
Correct the row count in the result data block when the tag scan tolerates partial errors. This prevents upper-layer operators from accessing invalid memory address.
This commit is contained in:
parent
54bc8c3e22
commit
03c33e77aa
|
@ -4294,13 +4294,13 @@ _error:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr,
|
static int32_t doTagScanOneTable(SOperatorInfo* pOperator, SSDataBlock* pRes, SMetaReader* mr, SStorageAPI* pAPI) {
|
||||||
SStorageAPI* pAPI) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
|
int32_t count = pRes->info.rows;
|
||||||
|
|
||||||
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
|
STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos);
|
||||||
if (!item) {
|
if (!item) {
|
||||||
|
@ -4360,6 +4360,8 @@ _end:
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
} else {
|
||||||
|
pRes->info.rows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -4715,26 +4717,23 @@ static int32_t doTagScanFromMetaEntryNext(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t count = 0;
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
|
||||||
|
pRes->info.rows = 0;
|
||||||
|
|
||||||
while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) {
|
while (pInfo->curPos < size && pRes->info.rows < pOperator->resultInfo.capacity) {
|
||||||
code = doTagScanOneTable(pOperator, pRes, count, &mr, &pTaskInfo->storageAPI);
|
code = doTagScanOneTable(pOperator, pRes, &mr, &pTaskInfo->storageAPI);
|
||||||
if (code != TSDB_CODE_OUT_OF_MEMORY) {
|
if (code != TSDB_CODE_OUT_OF_MEMORY) {
|
||||||
// ignore other error
|
// ignore other error
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
++count;
|
|
||||||
if (++pInfo->curPos >= size) {
|
if (++pInfo->curPos >= size) {
|
||||||
setOperatorCompleted(pOperator);
|
setOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.rows = count;
|
|
||||||
|
|
||||||
pAPI->metaReaderFn.clearReader(&mr);
|
pAPI->metaReaderFn.clearReader(&mr);
|
||||||
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
||||||
if (bLimitReached) {
|
if (bLimitReached) {
|
||||||
|
|
Loading…
Reference in New Issue