From 34c125e1bb7f54fc824bb95ea9c5194ba83b187e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 16 Apr 2022 11:47:50 +0800 Subject: [PATCH] fix(query): fix bug in employing sma data during aggregate process. --- source/libs/executor/src/executorimpl.c | 53 +++++++++++++------------ source/libs/executor/src/scanoperator.c | 23 +++++++++-- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3fec3234a9..0cc536f4d3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -186,7 +186,7 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); static bool functionNeedToExecute(SqlFunctionCtx* pCtx); -static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn); +static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock, SColumn* pColumn); static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); static bool hasMainOutput(STaskAttr* pQueryAttr); @@ -325,8 +325,7 @@ static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput } static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { - if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || - pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { return false; } @@ -1042,22 +1041,19 @@ static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows, static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); -static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, - int32_t order) { +static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; - pCtx[i].currentStage = (uint8_t)pOperator->pRuntimeEnv->scanFlag; - - setBlockStatisInfo(&pCtx[i], pBlock, NULL /*&pOperator->pExpr[i].base.colInfo*/); + setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock, NULL /*&pOperator->pExpr[i].base.colInfo*/); } } void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { - if (pBlock->pDataBlock != NULL) { - doSetInputDataBlock(pOperator, pCtx, pBlock, order); - } else { + if (pBlock->pBlockAgg != NULL) { doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); + } else { + doSetInputDataBlock(pOperator, pCtx, pBlock, order); } } @@ -1745,27 +1741,32 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return true; } -void setBlockStatisInfo(SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn) { - SColumnDataAgg* pAgg = NULL; +void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock, SColumn* pColumn) { + if (pBlock->pBlockAgg != NULL /*&& TSDB_COL_IS_NORMAL_COL(pColumn->flag)*/) { + for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) { + SFunctParam* pFuncParam = &pExprInfo->base.pParam[j]; + if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { + int32_t slotId = pFuncParam->pCol->slotId; + SInputColumnInfoData* pInput = &pCtx->input; - if (pSDataBlock->pBlockAgg != NULL && TSDB_COL_IS_NORMAL_COL(pColumn->flag)) { - pAgg = &pSDataBlock->pBlockAgg[pCtx->columnIndex]; - - pCtx->agg = *pAgg; - pCtx->isAggSet = true; - assert(pCtx->agg.numOfNull <= pSDataBlock->info.rows); + pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId]; + pInput->colDataAggIsSet = true; + pInput->numOfRows = pBlock->info.rows; + pInput->totalRows = pBlock->info.rows; + } + } } else { - pCtx->isAggSet = false; + pCtx->input.colDataAggIsSet = false; } - pCtx->hasNull = hasNull(pColumn, pAgg); +// pCtx->hasNull = hasNull(pColumn, pAgg); // set the statistics data for primary time stamp column - if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - pCtx->isAggSet = true; - pCtx->agg.min = pSDataBlock->info.window.skey; - pCtx->agg.max = pSDataBlock->info.window.ekey; - } + // if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + // pCtx->isAggSet = true; + // pCtx->agg.min = pBlock->info.window.skey; + // pCtx->agg.max = pBlock->info.window.ekey; + // } } // set the output buffer for the selectivity + tag query diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 031b76614d..c7d4c4966a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -80,6 +80,7 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, } SDataBlockInfo* pBlockInfo = &pBlock->info; + taosMemoryFreeClear(pBlock->pBlockAgg); if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, @@ -93,15 +94,28 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pBlock->pBlockAgg); - // failed to load the block sma data, data block statistics does not exist, load data block instead - if (pBlock->pBlockAgg == NULL) { + SColumnDataAgg* pColAgg = NULL; + tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg); + + if (pColAgg != NULL) { + int32_t numOfCols = pBlock->info.numOfCols; + + // todo create this buffer during creating operator + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg)); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + } + } else { + // failed to load the block sma data, data block statistics does not exist, load data block instead pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL); pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; } - return TSDB_CODE_SUCCESS; } @@ -136,6 +150,7 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, ASSERT(pColMatchInfo->colId == p->info.colId); taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); +// taosArraySet(pBlock->pBlockAgg) } }