[td-1394] add the returning value check after invoking the load data block from file function.

This commit is contained in:
Haojun Liao 2020-09-09 13:22:45 +08:00
parent ed7e81f154
commit fb25b058ae
1 changed files with 39 additions and 26 deletions

View File

@ -2185,43 +2185,43 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
return false; return false;
} }
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock) { int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
SQuery *pQuery = pRuntimeEnv->pQuery; SQuery *pQuery = pRuntimeEnv->pQuery;
uint32_t status = 0; *status = 0;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) { if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf > 0) {
status = BLK_DATA_ALL_NEEDED; *status = BLK_DATA_ALL_NEEDED;
} else { // check if this data block is required to load } else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block. // Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block. // If current data block is contained by all possible time window, do not load current data block.
if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) {
status = BLK_DATA_ALL_NEEDED; *status = BLK_DATA_ALL_NEEDED;
} }
if (status != BLK_DATA_ALL_NEEDED) { if ((*status) != BLK_DATA_ALL_NEEDED) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base; SSqlFuncMsg* pSqlFunc = &pQuery->pSelectExpr[i].base;
int32_t functionId = pSqlFunc->functionId; int32_t functionId = pSqlFunc->functionId;
int32_t colId = pSqlFunc->colInfo.colId; int32_t colId = pSqlFunc->colInfo.colId;
status |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId); (*status) |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
break; break;
} }
} }
} }
} }
if (status == BLK_DATA_NO_NEEDED) { if ((*status) == BLK_DATA_NO_NEEDED) {
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pRuntimeEnv->summary.discardBlocks += 1; pRuntimeEnv->summary.discardBlocks += 1;
} else if (status == BLK_DATA_STATIS_NEEDED) { } else if ((*status) == BLK_DATA_STATIS_NEEDED) {
if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) {
// return DISK_DATA_LOAD_FAILED; // this function never returns error?
} tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis);
pRuntimeEnv->summary.loadBlockStatis += 1; pRuntimeEnv->summary.loadBlockStatis += 1;
@ -2230,24 +2230,26 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows; pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
} }
} else { } else {
assert(status == BLK_DATA_ALL_NEEDED); assert((*status) == BLK_DATA_ALL_NEEDED);
// load the data block statistics to perform further filter // load the data block statistics to perform further filter
pRuntimeEnv->summary.loadBlockStatis += 1; pRuntimeEnv->summary.loadBlockStatis += 1;
if (tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis) != TSDB_CODE_SUCCESS) { tsdbRetrieveDataBlockStatisInfo(pQueryHandle, pStatis);
}
if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) { if (!needToLoadDataBlock(pRuntimeEnv, *pStatis, pRuntimeEnv->pCtx, pBlockInfo->rows)) {
// current block has been discard due to filter applied // current block has been discard due to filter applied
pRuntimeEnv->summary.discardBlocks += 1; pRuntimeEnv->summary.discardBlocks += 1;
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv), qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
return BLK_DATA_DISCARD; (*status) = BLK_DATA_DISCARD;
} }
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows; pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
pRuntimeEnv->summary.loadBlocks += 1; pRuntimeEnv->summary.loadBlocks += 1;
*pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL); *pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
if (*pDataBlock == NULL) {
return terrno;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2431,15 +2433,18 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
ensureOutputBuffer(pRuntimeEnv, &blockInfo); ensureOutputBuffer(pRuntimeEnv, &blockInfo);
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL; SArray * pDataBlock = NULL;
if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) { uint32_t status = 0;
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step;
continue; int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
} }
if (terrno != TSDB_CODE_SUCCESS) { // load data block failed, abort query if (status == BLK_DATA_DISCARD) {
longjmp(pRuntimeEnv->env, terrno); pQuery->current->lastKey =
break; QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue;
} }
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition // query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
@ -4651,9 +4656,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
} }
SDataStatis *pStatis = NULL; SDataStatis *pStatis = NULL;
SArray *pDataBlock = NULL; SArray * pDataBlock = NULL;
if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) { uint32_t status = 0;
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step;
int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock, &status);
if (ret != TSDB_CODE_SUCCESS) {
break;
}
if (status == BLK_DATA_DISCARD) {
pQuery->current->lastKey =
QUERY_IS_ASC_QUERY(pQuery) ? blockInfo.window.ekey + step : blockInfo.window.skey + step;
continue; continue;
} }