Merge pull request #12607 from taosdata/feature/3.0_glzhao
feat(query): add elapsed function
This commit is contained in:
commit
2f128fa3c9
|
@ -85,6 +85,11 @@ bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo)
|
||||||
int32_t spreadFunction(SqlFunctionCtx* pCtx);
|
int32_t spreadFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
int32_t elapsedFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
int32_t histogramFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
|
@ -226,6 +226,27 @@ static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
int32_t paraNum = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
if (1 != paraNum && 2 != paraNum) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
|
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"The input parameter of ELAPSED function can only be column");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (TSDB_DATA_TYPE_TIMESTAMP != paraType) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
if (3 != numOfParams) {
|
if (3 != numOfParams) {
|
||||||
|
@ -794,6 +815,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = spreadFunction,
|
.processFunc = spreadFunction,
|
||||||
.finalizeFunc = spreadFinalize
|
.finalizeFunc = spreadFinalize
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
.name = "elapsed",
|
||||||
|
.type = FUNCTION_TYPE_ELAPSED,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.dataRequiredFunc = statisDataRequired,
|
||||||
|
.translateFunc = translateElapsed,
|
||||||
|
.getEnvFunc = getElapsedFuncEnv,
|
||||||
|
.initFunc = elapsedFunctionSetup,
|
||||||
|
.processFunc = elapsedFunction,
|
||||||
|
.finalizeFunc = elapsedFinalize
|
||||||
|
},
|
||||||
{
|
{
|
||||||
.name = "last_row",
|
.name = "last_row",
|
||||||
.type = FUNCTION_TYPE_LAST_ROW,
|
.type = FUNCTION_TYPE_LAST_ROW,
|
||||||
|
|
|
@ -112,6 +112,13 @@ typedef struct SSpreadInfo {
|
||||||
double max;
|
double max;
|
||||||
} SSpreadInfo;
|
} SSpreadInfo;
|
||||||
|
|
||||||
|
typedef struct SElapsedInfo {
|
||||||
|
double result;
|
||||||
|
TSKEY min;
|
||||||
|
TSKEY max;
|
||||||
|
int64_t timeUnit;
|
||||||
|
} SElapsedInfo;
|
||||||
|
|
||||||
typedef struct SHistoFuncBin {
|
typedef struct SHistoFuncBin {
|
||||||
double lower;
|
double lower;
|
||||||
double upper;
|
double upper;
|
||||||
|
@ -2494,6 +2501,116 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getElapsedFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(SElapsedInfo);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
pInfo->result = 0;
|
||||||
|
pInfo->min = MAX_TS_KEY;
|
||||||
|
pInfo->max = 0;
|
||||||
|
|
||||||
|
if (pCtx->numOfParams == 3) {
|
||||||
|
pInfo->timeUnit = pCtx->param[1].param.i;
|
||||||
|
} else {
|
||||||
|
pInfo->timeUnit = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
|
||||||
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
|
// Only the pre-computing information loaded and actual data does not loaded
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnDataAgg *pAgg = pInput->pColumnDataAgg[0];
|
||||||
|
|
||||||
|
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
numOfElems = pInput->numOfRows; //since this is the primary timestamp, no need to exclude NULL values
|
||||||
|
if (numOfElems == 0) {
|
||||||
|
goto _elapsed_over;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInput->colDataAggIsSet) {
|
||||||
|
|
||||||
|
if (pInfo->min == MAX_TS_KEY) {
|
||||||
|
pInfo->min = GET_INT64_VAL(&pAgg->min);
|
||||||
|
pInfo->max = GET_INT64_VAL(&pAgg->max);
|
||||||
|
} else {
|
||||||
|
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||||
|
pInfo->max = GET_INT64_VAL(&pAgg->max);
|
||||||
|
} else {
|
||||||
|
pInfo->min = GET_INT64_VAL(&pAgg->min);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else { // computing based on the true data block
|
||||||
|
if (0 == pCtx->size) {
|
||||||
|
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||||
|
if (pCtx->end.key != INT64_MIN) {
|
||||||
|
pInfo->min = pCtx->end.key;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pCtx->end.key != INT64_MIN) {
|
||||||
|
pInfo->max = pCtx->end.key + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
goto _elapsed_over;
|
||||||
|
}
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start);
|
||||||
|
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||||
|
if (pCtx->start.key == INT64_MIN) {
|
||||||
|
pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max;
|
||||||
|
} else {
|
||||||
|
pInfo->max = pCtx->start.key + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCtx->end.key != INT64_MIN) {
|
||||||
|
pInfo->min = pCtx->end.key;
|
||||||
|
} else {
|
||||||
|
pInfo->min = ptsList[0];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pCtx->start.key == INT64_MIN) {
|
||||||
|
pInfo->min = (pInfo->min > ptsList[0]) ? ptsList[0] : pInfo->min;
|
||||||
|
} else {
|
||||||
|
pInfo->min = pCtx->start.key;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCtx->end.key != INT64_MIN) {
|
||||||
|
pInfo->max = pCtx->end.key + 1;
|
||||||
|
} else {
|
||||||
|
pInfo->max = ptsList[pCtx->size - 1];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_elapsed_over:
|
||||||
|
// data in the check operation are all null, not output
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
double result = (double)pInfo->max - (double)pInfo->min;
|
||||||
|
result = (result >= 0) ? result : -result;
|
||||||
|
pInfo->result = result / pInfo->timeUnit;
|
||||||
|
return functionFinalize(pCtx, pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
bool getHistogramFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getHistogramFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
|
pEnv->calcMemSize = sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin);
|
||||||
return true;
|
return true;
|
||||||
|
|
Loading…
Reference in New Issue