fix(query): fix bug in employing sma data during aggregate process.
This commit is contained in:
parent
fd9e520865
commit
34c125e1bb
|
@ -186,7 +186,7 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o
|
||||||
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
|
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
|
||||||
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
static bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn);
|
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock, SColumn* pColumn);
|
||||||
|
|
||||||
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
||||||
static bool hasMainOutput(STaskAttr* pQueryAttr);
|
static bool hasMainOutput(STaskAttr* pQueryAttr);
|
||||||
|
@ -325,8 +325,7 @@ static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||||
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
|
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1042,22 +1041,19 @@ static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols, int32_t rows,
|
||||||
|
|
||||||
static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
static void doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
||||||
int32_t order) {
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].size = pBlock->info.rows;
|
pCtx[i].size = pBlock->info.rows;
|
||||||
pCtx[i].currentStage = (uint8_t)pOperator->pRuntimeEnv->scanFlag;
|
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock, NULL /*&pOperator->pExpr[i].base.colInfo*/);
|
||||||
|
|
||||||
setBlockStatisInfo(&pCtx[i], pBlock, NULL /*&pOperator->pExpr[i].base.colInfo*/);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) {
|
||||||
if (pBlock->pDataBlock != NULL) {
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
doSetInputDataBlock(pOperator, pCtx, pBlock, order);
|
|
||||||
} else {
|
|
||||||
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
||||||
|
} else {
|
||||||
|
doSetInputDataBlock(pOperator, pCtx, pBlock, order);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1745,27 +1741,32 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setBlockStatisInfo(SqlFunctionCtx* pCtx, SSDataBlock* pSDataBlock, SColumn* pColumn) {
|
void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock, SColumn* pColumn) {
|
||||||
SColumnDataAgg* pAgg = NULL;
|
if (pBlock->pBlockAgg != NULL /*&& TSDB_COL_IS_NORMAL_COL(pColumn->flag)*/) {
|
||||||
|
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
|
||||||
|
SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
|
||||||
|
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||||
|
int32_t slotId = pFuncParam->pCol->slotId;
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
|
||||||
if (pSDataBlock->pBlockAgg != NULL && TSDB_COL_IS_NORMAL_COL(pColumn->flag)) {
|
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
|
||||||
pAgg = &pSDataBlock->pBlockAgg[pCtx->columnIndex];
|
pInput->colDataAggIsSet = true;
|
||||||
|
pInput->numOfRows = pBlock->info.rows;
|
||||||
pCtx->agg = *pAgg;
|
pInput->totalRows = pBlock->info.rows;
|
||||||
pCtx->isAggSet = true;
|
}
|
||||||
assert(pCtx->agg.numOfNull <= pSDataBlock->info.rows);
|
}
|
||||||
} else {
|
} else {
|
||||||
pCtx->isAggSet = false;
|
pCtx->input.colDataAggIsSet = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCtx->hasNull = hasNull(pColumn, pAgg);
|
// pCtx->hasNull = hasNull(pColumn, pAgg);
|
||||||
|
|
||||||
// set the statistics data for primary time stamp column
|
// set the statistics data for primary time stamp column
|
||||||
if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
// if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
pCtx->isAggSet = true;
|
// pCtx->isAggSet = true;
|
||||||
pCtx->agg.min = pSDataBlock->info.window.skey;
|
// pCtx->agg.min = pBlock->info.window.skey;
|
||||||
pCtx->agg.max = pSDataBlock->info.window.ekey;
|
// pCtx->agg.max = pBlock->info.window.ekey;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the output buffer for the selectivity + tag query
|
// set the output buffer for the selectivity + tag query
|
||||||
|
|
|
@ -80,6 +80,7 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||||
|
taosMemoryFreeClear(pBlock->pBlockAgg);
|
||||||
|
|
||||||
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
|
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
|
||||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
|
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
|
||||||
|
@ -93,15 +94,28 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pBlock->pBlockAgg);
|
|
||||||
|
|
||||||
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pColAgg);
|
||||||
|
|
||||||
|
if (pColAgg != NULL) {
|
||||||
|
int32_t numOfCols = pBlock->info.numOfCols;
|
||||||
|
|
||||||
|
// todo create this buffer during creating operator
|
||||||
|
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, sizeof(SColumnDataAgg));
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
|
||||||
|
if (!pColMatchInfo->output) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
// failed to load the block sma data, data block statistics does not exist, load data block instead
|
// failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
if (pBlock->pBlockAgg == NULL) {
|
|
||||||
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||||
pCost->totalCheckedRows += pBlock->info.rows;
|
pCost->totalCheckedRows += pBlock->info.rows;
|
||||||
pCost->loadBlocks += 1;
|
pCost->loadBlocks += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,6 +150,7 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
|
||||||
|
|
||||||
ASSERT(pColMatchInfo->colId == p->info.colId);
|
ASSERT(pColMatchInfo->colId == p->info.colId);
|
||||||
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
|
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
|
||||||
|
// taosArraySet(pBlock->pBlockAgg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue