enh: cumulative functions support mixed use
This commit is contained in:
parent
22635992fd
commit
41b8ef7d93
|
@ -157,6 +157,13 @@ typedef enum EFunctionType {
|
|||
FUNCTION_TYPE_UDF = 10000
|
||||
} EFunctionType;
|
||||
|
||||
typedef enum EFuncReturnRows {
|
||||
FUNC_RETURN_ROWS_NORMAL = 1,
|
||||
FUNC_RETURN_ROWS_INDEFINITE,
|
||||
FUNC_RETURN_ROWS_N,
|
||||
FUNC_RETURN_ROWS_N_MINUS_1
|
||||
} EFuncReturnRows;
|
||||
|
||||
struct SqlFunctionCtx;
|
||||
struct SResultRowEntryInfo;
|
||||
struct STimeWindow;
|
||||
|
@ -167,6 +174,8 @@ void fmFuncMgtDestroy();
|
|||
|
||||
int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen);
|
||||
|
||||
EFuncReturnRows fmGetFuncReturnRows(SFunctionNode* pFunc);
|
||||
|
||||
bool fmIsBuiltinFunc(const char* pFunc);
|
||||
|
||||
bool fmIsAggFunc(int32_t funcId);
|
||||
|
@ -198,6 +207,7 @@ bool fmIsImplicitTsFunc(int32_t funcId);
|
|||
bool fmIsClientPseudoColumnFunc(int32_t funcId);
|
||||
bool fmIsMultiRowsFunc(int32_t funcId);
|
||||
bool fmIsKeepOrderFunc(int32_t funcId);
|
||||
bool fmIsCumulativeFunc(int32_t funcId);
|
||||
|
||||
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
|
||||
|
||||
|
|
|
@ -253,6 +253,7 @@ typedef struct SSelectStmt {
|
|||
char stmtName[TSDB_TABLE_NAME_LEN];
|
||||
uint8_t precision;
|
||||
int32_t selectFuncNum;
|
||||
int32_t returnRows; // EFuncReturnRows
|
||||
bool isEmptyResult;
|
||||
bool isTimeLineResult;
|
||||
bool isSubquery;
|
||||
|
|
|
@ -26,6 +26,7 @@ typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||
typedef int32_t (*FCreateMergeFuncParameters)(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters);
|
||||
typedef EFuncDataRequired (*FFuncDynDataRequired)(void* pRes, STimeWindow* pTimeWindow);
|
||||
typedef EFuncReturnRows (*FEstimateReturnRows)(SFunctionNode* pFunc);
|
||||
|
||||
typedef struct SBuiltinFuncDefinition {
|
||||
const char* name;
|
||||
|
@ -44,6 +45,7 @@ typedef struct SBuiltinFuncDefinition {
|
|||
const char* pPartialFunc;
|
||||
const char* pMergeFunc;
|
||||
FCreateMergeFuncParameters createMergeParaFuc;
|
||||
FEstimateReturnRows estimateReturnRowsFunc;
|
||||
} SBuiltinFuncDefinition;
|
||||
|
||||
extern const SBuiltinFuncDefinition funcMgtBuiltins[];
|
||||
|
|
|
@ -48,6 +48,7 @@ extern "C" {
|
|||
#define FUNC_MGT_CLIENT_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(19)
|
||||
#define FUNC_MGT_MULTI_ROWS_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(20)
|
||||
#define FUNC_MGT_KEEP_ORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(21)
|
||||
#define FUNC_MGT_CUMULATIVE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(22)
|
||||
|
||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
|
|
|
@ -1277,6 +1277,8 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EFuncReturnRows csumEstReturnRows(SFunctionNode* pFunc) { return FUNC_RETURN_ROWS_N; }
|
||||
|
||||
static int32_t translateMavg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (2 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -1416,6 +1418,11 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EFuncReturnRows derivativeEstReturnRows(SFunctionNode* pFunc) {
|
||||
return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 2))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE
|
||||
: FUNC_RETURN_ROWS_N_MINUS_1;
|
||||
}
|
||||
|
||||
static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -1551,6 +1558,14 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EFuncReturnRows diffEstReturnRows(SFunctionNode* pFunc) {
|
||||
if (1 == LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return FUNC_RETURN_ROWS_N_MINUS_1;
|
||||
}
|
||||
return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE
|
||||
: FUNC_RETURN_ROWS_N_MINUS_1;
|
||||
}
|
||||
|
||||
static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -2231,13 +2246,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "derivative",
|
||||
.type = FUNCTION_TYPE_DERIVATIVE,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
|
||||
.translateFunc = translateDerivative,
|
||||
.getEnvFunc = getDerivativeFuncEnv,
|
||||
.initFunc = derivativeFuncSetup,
|
||||
.processFunc = derivativeFunction,
|
||||
.sprocessFunc = derivativeScalarFunction,
|
||||
.finalizeFunc = functionFinalize
|
||||
.finalizeFunc = functionFinalize,
|
||||
.estimateReturnRowsFunc = derivativeEstReturnRows
|
||||
},
|
||||
{
|
||||
.name = "irate",
|
||||
|
@ -2436,13 +2452,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
.initFunc = diffFunctionSetup,
|
||||
.processFunc = diffFunction,
|
||||
.sprocessFunc = diffScalarFunction,
|
||||
.finalizeFunc = functionFinalize
|
||||
.finalizeFunc = functionFinalize,
|
||||
.estimateReturnRowsFunc = diffEstReturnRows
|
||||
},
|
||||
{
|
||||
.name = "statecount",
|
||||
|
@ -2469,13 +2486,14 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "csum",
|
||||
.type = FUNCTION_TYPE_CSUM,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
|
||||
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC,
|
||||
.translateFunc = translateCsum,
|
||||
.getEnvFunc = getCsumFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = csumFunction,
|
||||
.sprocessFunc = csumScalarFunction,
|
||||
.finalizeFunc = NULL
|
||||
.finalizeFunc = NULL,
|
||||
.estimateReturnRowsFunc = csumEstReturnRows,
|
||||
},
|
||||
{
|
||||
.name = "mavg",
|
||||
|
|
|
@ -89,6 +89,14 @@ int32_t fmGetFuncInfo(SFunctionNode* pFunc, char* pMsg, int32_t msgLen) {
|
|||
return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION;
|
||||
}
|
||||
|
||||
EFuncReturnRows fmGetFuncReturnRows(SFunctionNode* pFunc) {
|
||||
if (NULL != funcMgtBuiltins[pFunc->funcId].estimateReturnRowsFunc) {
|
||||
return funcMgtBuiltins[pFunc->funcId].estimateReturnRowsFunc(pFunc);
|
||||
}
|
||||
return (fmIsIndefiniteRowsFunc(pFunc->funcId) || fmIsMultiRowsFunc(pFunc->funcId)) ? FUNC_RETURN_ROWS_INDEFINITE
|
||||
: FUNC_RETURN_ROWS_NORMAL;
|
||||
}
|
||||
|
||||
bool fmIsBuiltinFunc(const char* pFunc) {
|
||||
return NULL != taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc, strlen(pFunc));
|
||||
}
|
||||
|
@ -192,6 +200,8 @@ bool fmIsMultiRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, F
|
|||
|
||||
bool fmIsKeepOrderFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_KEEP_ORDER_FUNC); }
|
||||
|
||||
bool fmIsCumulativeFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_CUMULATIVE_FUNC); }
|
||||
|
||||
bool fmIsInterpFunc(int32_t funcId) {
|
||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
return false;
|
||||
|
|
|
@ -1110,12 +1110,15 @@ static int32_t translateIndefiniteRowsFunc(STranslateContext* pCxt, SFunctionNod
|
|||
if (!fmIsIndefiniteRowsFunc(pFunc->funcId)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (!isSelectStmt(pCxt->pCurrStmt) || SQL_CLAUSE_SELECT != pCxt->currClause ||
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->hasIndefiniteRowsFunc || ((SSelectStmt*)pCxt->pCurrStmt)->hasAggFuncs ||
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->hasMultiRowsFunc) {
|
||||
if (!isSelectStmt(pCxt->pCurrStmt) || SQL_CLAUSE_SELECT != pCxt->currClause) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||
}
|
||||
if (NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pWindow || NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pGroupByList) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
|
||||
if (pSelect->hasAggFuncs || pSelect->hasMultiRowsFunc ||
|
||||
(pSelect->hasIndefiniteRowsFunc && pSelect->returnRows != fmGetFuncReturnRows(pFunc))) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||
}
|
||||
if (NULL != pSelect->pWindow || NULL != pSelect->pGroupByList) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
|
||||
"%s function is not supported in window query or group query", pFunc->functionName);
|
||||
}
|
||||
|
@ -1230,18 +1233,28 @@ static int32_t getMultiResFuncNum(SNodeList* pParameterList) {
|
|||
return LIST_LENGTH(pParameterList);
|
||||
}
|
||||
|
||||
static int32_t calcSelectFuncNum(SFunctionNode* pFunc, int32_t currSelectFuncNum) {
|
||||
if (fmIsCumulativeFunc(pFunc->funcId)) {
|
||||
return currSelectFuncNum > 0 ? currSelectFuncNum : 1;
|
||||
}
|
||||
return currSelectFuncNum + ((fmIsMultiResFunc(pFunc->funcId) && !fmIsLastRowFunc(pFunc->funcId))
|
||||
? getMultiResFuncNum(pFunc->pParameterList)
|
||||
: 1);
|
||||
}
|
||||
|
||||
static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
||||
if (NULL != pCurrStmt && QUERY_NODE_SELECT_STMT == nodeType(pCurrStmt)) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pCurrStmt;
|
||||
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
|
||||
pSelect->hasRepeatScanFuncs = pSelect->hasRepeatScanFuncs ? true : fmIsRepeatScanFunc(pFunc->funcId);
|
||||
pSelect->hasIndefiniteRowsFunc = pSelect->hasIndefiniteRowsFunc ? true : fmIsIndefiniteRowsFunc(pFunc->funcId);
|
||||
if (fmIsIndefiniteRowsFunc(pFunc->funcId)) {
|
||||
pSelect->hasIndefiniteRowsFunc = true;
|
||||
pSelect->returnRows = fmGetFuncReturnRows(pFunc);
|
||||
}
|
||||
pSelect->hasMultiRowsFunc = pSelect->hasMultiRowsFunc ? true : fmIsMultiRowsFunc(pFunc->funcId);
|
||||
if (fmIsSelectFunc(pFunc->funcId)) {
|
||||
pSelect->hasSelectFunc = true;
|
||||
pSelect->selectFuncNum += (fmIsMultiResFunc(pFunc->funcId) && !fmIsLastRowFunc(pFunc->funcId))
|
||||
? getMultiResFuncNum(pFunc->pParameterList)
|
||||
: 1;
|
||||
pSelect->selectFuncNum = calcSelectFuncNum(pFunc, pSelect->selectFuncNum);
|
||||
} else if (fmIsVectorFunc(pFunc->funcId)) {
|
||||
pSelect->hasOtherVectorFunc = true;
|
||||
}
|
||||
|
@ -2481,6 +2494,9 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
}
|
||||
|
||||
static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) {
|
||||
if (NULL == pPartitionByList) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
|
||||
return translateExprList(pCxt, pPartitionByList);
|
||||
}
|
||||
|
|
|
@ -175,6 +175,16 @@ TEST_F(PlanBasicTest, pseudoColumn) {
|
|||
"WHERE ts BETWEEN '2017-7-14 18:00:00' AND '2017-7-14 19:00:00' INTERVAL(10S)");
|
||||
}
|
||||
|
||||
TEST_F(PlanBasicTest, indefiniteRowsFunc) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT DIFF(c1) FROM t1");
|
||||
|
||||
run("SELECT DIFF(c1), c2 FROM t1");
|
||||
|
||||
run("SELECT DIFF(c1), DIFF(c3), ts FROM t1");
|
||||
}
|
||||
|
||||
TEST_F(PlanBasicTest, withoutFrom) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
|
Loading…
Reference in New Issue