Merge pull request #28285 from taosdata/feat/anode312
feat: analysis operator
This commit is contained in:
commit
21c0162c5e
|
@ -16,9 +16,10 @@
|
||||||
#include "builtins.h"
|
#include "builtins.h"
|
||||||
#include "builtinsimpl.h"
|
#include "builtinsimpl.h"
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
|
#include "geomFunc.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "geomFunc.h"
|
#include "tanal.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
|
||||||
|
@ -2078,6 +2079,47 @@ static int32_t translateMode(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
return translateUniqueMode(pFunc, pErrBuf, len, false);
|
return translateUniqueMode(pFunc, pErrBuf, len, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateForecast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
if (2 != numOfParams && 1 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, "FORECAST require 1 or 2 parameters");
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t valType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
|
||||||
|
if (!IS_MATHABLE_TYPE(valType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST only support mathable column");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfParams == 2) {
|
||||||
|
uint8_t optionType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 1))->type;
|
||||||
|
if (TSDB_DATA_TYPE_BINARY != optionType) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST option should be varchar");
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pOption = nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
if (QUERY_NODE_VALUE != nodeType(pOption)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, "FORECAST option should be value");
|
||||||
|
}
|
||||||
|
|
||||||
|
SValueNode* pValue = (SValueNode*)pOption;
|
||||||
|
if (!taosAnalGetOptStr(pValue->literal, "algo", NULL, 0) != 0) {
|
||||||
|
return invaildFuncParaValueErrMsg(pErrBuf, len, "FORECAST option should include algo field");
|
||||||
|
}
|
||||||
|
|
||||||
|
pValue->notReserved = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[valType].bytes, .type = valType};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateForecastConf(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_FLOAT].bytes, .type = TSDB_DATA_TYPE_FLOAT};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static EFuncReturnRows forecastEstReturnRows(SFunctionNode* pFunc) { return FUNC_RETURN_ROWS_N; }
|
||||||
|
|
||||||
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
if (numOfParams > 2) {
|
if (numOfParams > 2) {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
|
#include "tanal.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tdigest.h"
|
#include "tdigest.h"
|
||||||
|
@ -3578,6 +3579,11 @@ bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getForecastConfEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
pEnv->calcMemSize = sizeof(float);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){
|
int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
|
@ -232,6 +232,15 @@ bool fmIsInterpFunc(int32_t funcId) {
|
||||||
|
|
||||||
bool fmIsInterpPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERP_PC_FUNC); }
|
bool fmIsInterpPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INTERP_PC_FUNC); }
|
||||||
|
|
||||||
|
bool fmIsForecastFunc(int32_t funcId) {
|
||||||
|
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return FUNCTION_TYPE_FORECAST == funcMgtBuiltins[funcId].type;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool fmIsForecastPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORECAST_PC_FUNC); }
|
||||||
|
|
||||||
bool fmIsLastRowFunc(int32_t funcId) {
|
bool fmIsLastRowFunc(int32_t funcId) {
|
||||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -1367,6 +1367,25 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SNode* createAnomalyWindowNode(SAstCreateContext* pCxt, SNode* pExpr, const SToken* pFuncOpt) {
|
||||||
|
SAnomalyWindowNode* pAnomaly = NULL;
|
||||||
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
pCxt->errCode = nodesMakeNode(QUERY_NODE_ANOMALY_WINDOW, (SNode**)&pAnomaly);
|
||||||
|
CHECK_MAKE_NODE(pAnomaly);
|
||||||
|
pAnomaly->pCol = createPrimaryKeyCol(pCxt, NULL);
|
||||||
|
CHECK_MAKE_NODE(pAnomaly->pCol);
|
||||||
|
pAnomaly->pExpr = pExpr;
|
||||||
|
if (pFuncOpt == NULL) {
|
||||||
|
tstrncpy(pAnomaly->anomalyOpt, "algo=iqr", TSDB_ANAL_ALGO_OPTION_LEN);
|
||||||
|
} else {
|
||||||
|
(void)trimString(pFuncOpt->z, pFuncOpt->n, pAnomaly->anomalyOpt, sizeof(pAnomaly->anomalyOpt));
|
||||||
|
}
|
||||||
|
return (SNode*)pAnomaly;
|
||||||
|
_err:
|
||||||
|
nodesDestroyNode((SNode*)pAnomaly);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
|
SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding,
|
||||||
SNode* pFill) {
|
SNode* pFill) {
|
||||||
SIntervalWindowNode* interval = NULL;
|
SIntervalWindowNode* interval = NULL;
|
||||||
|
@ -2997,6 +3016,47 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SNode* createCreateAnodeStmt(SAstCreateContext* pCxt, const SToken* pUrl) {
|
||||||
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
SCreateAnodeStmt* pStmt = NULL;
|
||||||
|
pCxt->errCode = nodesMakeNode(QUERY_NODE_CREATE_ANODE_STMT, (SNode**)&pStmt);
|
||||||
|
CHECK_MAKE_NODE(pStmt);
|
||||||
|
(void)trimString(pUrl->z, pUrl->n, pStmt->url, sizeof(pStmt->url));
|
||||||
|
return (SNode*)pStmt;
|
||||||
|
_err:
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* createDropAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode) {
|
||||||
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
SUpdateAnodeStmt* pStmt = NULL;
|
||||||
|
pCxt->errCode = nodesMakeNode(QUERY_NODE_DROP_ANODE_STMT, (SNode**)&pStmt);
|
||||||
|
CHECK_MAKE_NODE(pStmt);
|
||||||
|
if (NULL != pAnode) {
|
||||||
|
pStmt->anodeId = taosStr2Int32(pAnode->z, NULL, 10);
|
||||||
|
} else {
|
||||||
|
pStmt->anodeId = -1;
|
||||||
|
}
|
||||||
|
return (SNode*)pStmt;
|
||||||
|
_err:
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* createUpdateAnodeStmt(SAstCreateContext* pCxt, const SToken* pAnode, bool updateAll) {
|
||||||
|
CHECK_PARSER_STATUS(pCxt);
|
||||||
|
SUpdateAnodeStmt* pStmt = NULL;
|
||||||
|
pCxt->errCode = nodesMakeNode(QUERY_NODE_UPDATE_ANODE_STMT, (SNode**)&pStmt);
|
||||||
|
CHECK_MAKE_NODE(pStmt);
|
||||||
|
if (NULL != pAnode) {
|
||||||
|
pStmt->anodeId = taosStr2Int32(pAnode->z, NULL, 10);
|
||||||
|
} else {
|
||||||
|
pStmt->anodeId = -1;
|
||||||
|
}
|
||||||
|
return (SNode*)pStmt;
|
||||||
|
_err:
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue) {
|
SNode* createEncryptKeyStmt(SAstCreateContext* pCxt, const SToken* pValue) {
|
||||||
SToken config;
|
SToken config;
|
||||||
config.type = TK_NK_STRING;
|
config.type = TK_NK_STRING;
|
||||||
|
|
|
@ -185,6 +185,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
||||||
return "%s is not supported in system table query";
|
return "%s is not supported in system table query";
|
||||||
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
|
case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE:
|
||||||
return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
|
return "Invalid usage of RANGE clause, EVERY clause or FILL clause";
|
||||||
|
case TSDB_CODE_PAR_INVALID_FORECAST_CLAUSE:
|
||||||
|
return "Invalid usage of forecast clause";
|
||||||
case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN:
|
case TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN:
|
||||||
return "No valid function in window query";
|
return "No valid function in window query";
|
||||||
case TSDB_CODE_PAR_INVALID_OPTR_USAGE:
|
case TSDB_CODE_PAR_INVALID_OPTR_USAGE:
|
||||||
|
|
|
@ -2380,6 +2380,8 @@ static bool sortPriKeyOptHasUnsupportedPkFunc(SLogicNode* pLogicNode, EOrder sor
|
||||||
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
|
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
|
||||||
pFuncList = ((SInterpFuncLogicNode*)pLogicNode)->pFuncs;
|
pFuncList = ((SInterpFuncLogicNode*)pLogicNode)->pFuncs;
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_FORECAST_FUNC:
|
||||||
|
pFuncList = ((SForecastFuncLogicNode*)pLogicNode)->pFuncs;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue