fix(query): add the repeat scan flag check during aggregate executor.
This commit is contained in:
parent
38d52c69dc
commit
86031c1985
|
@ -173,6 +173,7 @@ typedef struct SqlFunctionCtx {
|
|||
SInputColumnInfoData input;
|
||||
SResultDataInfo resDataInfo;
|
||||
uint32_t order; // data block scanner order: asc|desc
|
||||
uint8_t scanFlag; // record current running step, default: 0
|
||||
////////////////////////////////////////////////////////////////
|
||||
int32_t startRow; // start row index
|
||||
int32_t size; // handled processed row number
|
||||
|
@ -183,7 +184,6 @@ typedef struct SqlFunctionCtx {
|
|||
bool hasNull; // null value exist in current block, TODO remove it
|
||||
bool requireNull; // require null in some function, TODO remove it
|
||||
int32_t columnIndex; // TODO remove it
|
||||
uint8_t currentStage; // record current running step, default: 0
|
||||
bool isAggSet;
|
||||
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
|
||||
bool stableQuery;
|
||||
|
|
|
@ -746,7 +746,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
pCtx[i].order = order;
|
||||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].pSrcBlock = pBlock;
|
||||
pCtx[i].currentStage = scanFlag;
|
||||
pCtx[i].scanFlag = scanFlag;
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx[i].input;
|
||||
pInput->uid = pBlock->info.uid;
|
||||
|
@ -826,23 +826,22 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
return code;
|
||||
}
|
||||
|
||||
static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
||||
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
||||
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
|
||||
if (functionNeedToExecute(&pCtx[k])) {
|
||||
pCtx[k].startTs = startTs;
|
||||
// this can be set during create the struct
|
||||
// todo add a dummy funtion to avoid process check
|
||||
if (pCtx[k].fpSet.process != NULL) {
|
||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s call aggregate function error happens, code : %s",
|
||||
GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||
pOperator->pTaskInfo->code = code;
|
||||
longjmp(pOperator->pTaskInfo->env, code);
|
||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
|
||||
|
@ -998,18 +997,22 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (pCtx->scanFlag == REPEAT_SCAN) {
|
||||
return fmIsRepeatScanFunc(pCtx->functionId);
|
||||
}
|
||||
|
||||
if (isRowEntryCompleted(pResInfo)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
|
||||
// return QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||
}
|
||||
|
||||
// denote the order type
|
||||
if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
|
||||
// return pCtx->param[0].i == pQueryAttr->order.order;
|
||||
}
|
||||
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
|
||||
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||
// }
|
||||
//
|
||||
// // denote the order type
|
||||
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
|
||||
// // return pCtx->param[0].i == pQueryAttr->order.order;
|
||||
// }
|
||||
|
||||
// in the reverse table scan, only the following functions need to be executed
|
||||
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
|
||||
|
@ -1944,7 +1947,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
|
|||
cleanupResultRowEntry(pEntry);
|
||||
|
||||
pCtx[i].resultInfo = pEntry;
|
||||
pCtx[i].currentStage = stage;
|
||||
pCtx[i].scanFlag = stage;
|
||||
|
||||
// set the timestamp output buffer for top/bottom/diff query
|
||||
// int32_t fid = pCtx[i].functionId;
|
||||
|
@ -3724,7 +3727,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||
|
||||
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
|
||||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
|
@ -3738,9 +3740,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
// if (pAggInfo->current != NULL) {
|
||||
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||
// }
|
||||
|
||||
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -3750,17 +3749,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
|||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||
if (pAggInfo->pScalarExprInfo != NULL) {
|
||||
code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
|
||||
pAggInfo->numOfScalarExpr, NULL);
|
||||
pAggInfo->numOfScalarExpr, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
|
||||
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||
code = doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||
if (code != 0) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
#if 0 // test for encode/decode result info
|
||||
if(pOperator->encodeResultRow){
|
||||
|
|
|
@ -260,6 +260,53 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
|
|||
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
||||
}
|
||||
|
||||
static void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
||||
// currently only the tbname pseudo column
|
||||
if (pTableScanInfo->numOfPseudoExpr == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTableScanInfo->readHandle.meta, 0);
|
||||
metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||
|
||||
for (int32_t j = 0; j < pTableScanInfo->numOfPseudoExpr; ++j) {
|
||||
SExprInfo* pExpr = &pTableScanInfo->pPseudoExpr[j];
|
||||
|
||||
int32_t dstSlotId = pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
|
||||
|
||||
int32_t functionId = pExpr->pExpr->_function.functionId;
|
||||
|
||||
// this is to handle the tbname
|
||||
if (fmIsScanPseudoColumnFunc(functionId)) {
|
||||
struct SScalarFuncExecFuncs fpSet = {0};
|
||||
fmGetScalarFuncExecFuncs(functionId, &fpSet);
|
||||
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
||||
infoData.info.bytes = sizeof(uint64_t);
|
||||
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
||||
|
||||
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
|
||||
SScalarParam srcParam = {
|
||||
.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
|
||||
|
||||
SScalarParam param = {.columnData = pColInfoData};
|
||||
fpSet.process(&srcParam, 1, ¶m);
|
||||
} else { // these are tags
|
||||
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
colDataAppend(pColInfoData, i, p, (p == NULL));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
|
@ -285,23 +332,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
|
||||
// currently only the tbname pseudo column
|
||||
if (pTableScanInfo->numOfPseudoExpr > 0) {
|
||||
int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
|
||||
|
||||
struct SScalarFuncExecFuncs fpSet;
|
||||
fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet);
|
||||
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
||||
infoData.info.bytes = sizeof(uint64_t);
|
||||
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
||||
|
||||
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
|
||||
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
|
||||
|
||||
SScalarParam param = {.columnData = pColInfoData};
|
||||
fpSet.process(&srcParam, 1, ¶m);
|
||||
addTagPseudoColumnData(pTableScanInfo, pBlock);
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
|
|
|
@ -1645,7 +1645,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
|||
int32_t type = pCol->info.type;
|
||||
|
||||
SPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
|
||||
if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) {
|
||||
pInfo->stage += 1;
|
||||
|
||||
// all data are null, set it completed
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
|
||||
#define GET_TRUE_DATA_TYPE() \
|
||||
int32_t type = 0; \
|
||||
if (pCtx->currentStage == MERGE_STAGE) { \
|
||||
if (pCtx->scanFlag == MERGE_STAGE) { \
|
||||
type = pCtx->resDataInfo.type; \
|
||||
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); \
|
||||
} else { \
|
||||
|
@ -908,7 +908,7 @@ static void avg_func_merge(SqlFunctionCtx *pCtx) {
|
|||
static void avg_finalizer(SqlFunctionCtx *pCtx) {
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
if (pCtx->scanFlag == MERGE_STAGE) {
|
||||
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
if (GET_INT64_VAL(GET_ROWCELL_INTERBUF(pResInfo)) <= 0) {
|
||||
|
@ -1152,7 +1152,7 @@ static void stddev_function(SqlFunctionCtx *pCtx) {
|
|||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) {
|
||||
if (pCtx->scanFlag == REPEAT_SCAN && pStd->stage == 0) {
|
||||
pStd->stage++;
|
||||
avg_finalizer(pCtx);
|
||||
|
||||
|
@ -1814,7 +1814,7 @@ static STopBotInfo *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
|
|||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
// only the first_stage_merge is directly written data into final output buffer
|
||||
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
|
||||
if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) {
|
||||
return (STopBotInfo*) pCtx->pOutput;
|
||||
} else { // during normal table query and super table at the secondary_stage, result is written to intermediate buffer
|
||||
return GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
@ -1956,7 +1956,7 @@ static void top_func_merge(SqlFunctionCtx *pCtx) {
|
|||
for (int32_t i = 0; i < pInput->num; ++i) {
|
||||
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT)? TSDB_DATA_TYPE_DOUBLE:pCtx->resDataInfo.type;
|
||||
// do_top_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp,
|
||||
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
|
||||
// type, &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag);
|
||||
}
|
||||
|
||||
SET_VAL(pCtx, pInput->num, pOutput->num);
|
||||
|
@ -2013,7 +2013,7 @@ static void bottom_func_merge(SqlFunctionCtx *pCtx) {
|
|||
for (int32_t i = 0; i < pInput->num; ++i) {
|
||||
int16_t type = (pCtx->resDataInfo.type == TSDB_DATA_TYPE_FLOAT) ? TSDB_DATA_TYPE_DOUBLE : pCtx->resDataInfo.type;
|
||||
// do_bottom_function_add(pOutput, (int32_t)pCtx->param[0].param.i, &pInput->res[i]->v.i, pInput->res[i]->timestamp, type,
|
||||
// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->currentStage);
|
||||
// &pCtx->tagInfo, pInput->res[i]->pTags, pCtx->scanFlag);
|
||||
}
|
||||
|
||||
SET_VAL(pCtx, pInput->num, pOutput->num);
|
||||
|
@ -2073,7 +2073,7 @@ static void percentile_function(SqlFunctionCtx *pCtx) {
|
|||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
|
||||
if (pCtx->scanFlag == REPEAT_SCAN && pInfo->stage == 0) {
|
||||
pInfo->stage += 1;
|
||||
|
||||
// all data are null, set it completed
|
||||
|
@ -2180,7 +2180,7 @@ static SAPercentileInfo *getAPerctInfo(SqlFunctionCtx *pCtx) {
|
|||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SAPercentileInfo* pInfo = NULL;
|
||||
|
||||
if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) {
|
||||
if (pCtx->stableQuery && pCtx->scanFlag != MERGE_STAGE) {
|
||||
pInfo = (SAPercentileInfo*) pCtx->pOutput;
|
||||
} else {
|
||||
pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
@ -2270,7 +2270,7 @@ static void apercentile_finalizer(SqlFunctionCtx *pCtx) {
|
|||
SResultRowEntryInfo * pResInfo = GET_RES_INFO(pCtx);
|
||||
SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
if (pCtx->scanFlag == MERGE_STAGE) {
|
||||
// if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null
|
||||
// assert(pOutput->pHisto->numOfElems > 0);
|
||||
//
|
||||
|
@ -2510,7 +2510,7 @@ static void copy_function(SqlFunctionCtx *pCtx);
|
|||
|
||||
static void tag_function(SqlFunctionCtx *pCtx) {
|
||||
SET_VAL(pCtx, 1, 1);
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
if (pCtx->scanFlag == MERGE_STAGE) {
|
||||
copy_function(pCtx);
|
||||
} else {
|
||||
taosVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->resDataInfo.type, true);
|
||||
|
@ -2966,7 +2966,7 @@ static bool spread_function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pRe
|
|||
SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
// this is the server-side setup function in client-side, the secondary merge do not need this procedure
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
if (pCtx->scanFlag == MERGE_STAGE) {
|
||||
// pCtx->param[0].param.d = DBL_MAX;
|
||||
// pCtx->param[3].param.d = -DBL_MAX;
|
||||
} else {
|
||||
|
@ -3086,7 +3086,7 @@ void spread_function_finalizer(SqlFunctionCtx *pCtx) {
|
|||
*/
|
||||
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
|
||||
if (pCtx->currentStage == MERGE_STAGE) {
|
||||
if (pCtx->scanFlag == MERGE_STAGE) {
|
||||
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
|
||||
|
||||
// if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||
|
|
Loading…
Reference in New Issue