Merge pull request #27211 from taosdata/fix/3_liaohj
fix(query): release the reader when returning error.
This commit is contained in:
commit
97d284eaf7
|
@ -255,21 +255,28 @@ static int32_t doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg* pCol
|
||||||
return filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows, keep);
|
return filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows, keep);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
static int32_t doLoadBlockSMA(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
|
||||||
|
bool* pLoad) {
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
bool allColumnsHaveAgg = true;
|
||||||
|
bool hasNullSMA = false;
|
||||||
|
if (pLoad != NULL) {
|
||||||
|
*pLoad = false;
|
||||||
|
}
|
||||||
|
|
||||||
bool allColumnsHaveAgg = true;
|
|
||||||
bool hasNullSMA = false;
|
|
||||||
int32_t code = pAPI->tsdReader.tsdReaderRetrieveBlockSMAInfo(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg,
|
int32_t code = pAPI->tsdReader.tsdReaderRetrieveBlockSMAInfo(pTableScanInfo->dataReader, pBlock, &allColumnsHaveAgg,
|
||||||
&hasNullSMA);
|
&hasNullSMA);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!allColumnsHaveAgg || hasNullSMA) {
|
if (!allColumnsHaveAgg || hasNullSMA) {
|
||||||
return false;
|
*pLoad = false;
|
||||||
|
} else {
|
||||||
|
*pLoad = true;
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
|
static void doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
|
||||||
|
@ -330,14 +337,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
bool loadSMA = false;
|
||||||
|
|
||||||
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||||
|
|
||||||
pCost->totalBlocks += 1;
|
pCost->totalBlocks += 1;
|
||||||
pCost->totalRows += pBlock->info.rows;
|
pCost->totalRows += pBlock->info.rows;
|
||||||
|
|
||||||
bool loadSMA = false;
|
|
||||||
*status = pTableScanInfo->dataBlockLoadFlag;
|
*status = pTableScanInfo->dataBlockLoadFlag;
|
||||||
|
|
||||||
if (pOperator->exprSupp.pFilterInfo != NULL) {
|
if (pOperator->exprSupp.pFilterInfo != NULL) {
|
||||||
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
} else {
|
} else {
|
||||||
|
@ -373,7 +380,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
loadSMA = true; // mark the operation of load sma;
|
loadSMA = true; // mark the operation of load sma;
|
||||||
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
bool success = true;
|
||||||
|
code = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo, &success);
|
||||||
|
if (code) {
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
|
||||||
|
qError("%s failed to retrieve sma info", GET_TASKID(pTaskInfo));
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
|
||||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
|
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64, GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
@ -387,13 +401,21 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
}
|
}
|
||||||
|
|
||||||
if(*status != FUNC_DATA_REQUIRED_DATA_LOAD) {
|
if(*status != FUNC_DATA_REQUIRED_DATA_LOAD) {
|
||||||
qError("[loadDataBlock] invalid status:%d", *status);
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
|
||||||
|
qError("%s loadDataBlock invalid status:%d", GET_TASKID(pTaskInfo), *status);
|
||||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to filter data block according to sma info
|
// try to filter data block according to sma info
|
||||||
if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
|
if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
|
||||||
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
bool success = true;
|
||||||
|
code = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo, &success);
|
||||||
|
if (code) {
|
||||||
|
pAPI->tsdReader.tsdReaderReleaseDataBlock(pTableScanInfo->dataReader);
|
||||||
|
qError("%s failed to retrieve sma info", GET_TASKID(pTaskInfo));
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
size_t size = taosArrayGetSize(pBlock->pDataBlock);
|
size_t size = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
bool keep = false;
|
bool keep = false;
|
||||||
|
|
Loading…
Reference in New Issue