support udf and add case
This commit is contained in:
parent
3e890bac8f
commit
a591f8d067
|
@ -81,7 +81,7 @@ static int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSe
|
|||
|
||||
static bool validateIpAddress(const char* ip, size_t size);
|
||||
static bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
|
||||
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery);
|
||||
static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery);
|
||||
|
||||
static int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd);
|
||||
|
||||
|
@ -1740,6 +1740,9 @@ void genUdfList(SArray* pUdfInfo, tSqlExpr *pNode) {
|
|||
if (pNode->functionId < 0) { // extract all possible user defined function
|
||||
struct SUdfInfo info = {0};
|
||||
info.name = strndup(pNode->operand.z, pNode->operand.n);
|
||||
int32_t functionId = taosArrayGetSize(pUdfInfo) * (-1) - 1;
|
||||
info.functionId = functionId;
|
||||
|
||||
taosArrayPush(pUdfInfo, &info);
|
||||
}
|
||||
}
|
||||
|
@ -1815,6 +1818,8 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelectLis
|
|||
if (pUdfInfo == NULL) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
|
||||
}
|
||||
|
||||
pItem->pNode->functionId = pUdfInfo->functionId;
|
||||
}
|
||||
|
||||
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially
|
||||
|
@ -1854,7 +1859,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelectLis
|
|||
addPrimaryTsColIntoResult(pQueryInfo);
|
||||
}
|
||||
|
||||
if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) {
|
||||
if (!functionCompatibleCheck(pCmd, pQueryInfo, joinQuery, timeWindowQuery)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
||||
|
@ -3131,47 +3136,57 @@ static bool groupbyTagsOrNull(SQueryInfo* pQueryInfo) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
|
||||
static bool functionCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, bool joinQuery, bool twQuery) {
|
||||
int32_t startIdx = 0;
|
||||
|
||||
int32_t aggUdf = 0;
|
||||
int32_t scalarUdf = 0;
|
||||
int32_t prjNum = 0;
|
||||
int32_t aggNum = 0;
|
||||
|
||||
size_t numOfExpr = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
assert(numOfExpr > 0);
|
||||
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, startIdx);
|
||||
|
||||
// ts function can be simultaneously used with any other functions.
|
||||
int32_t functionID = pExpr->functionId;
|
||||
if (functionID == TSDB_FUNC_TS || functionID == TSDB_FUNC_TS_DUMMY) {
|
||||
startIdx++;
|
||||
}
|
||||
|
||||
int32_t factor = functionCompatList[tscSqlExprGet(pQueryInfo, startIdx)->functionId];
|
||||
|
||||
if (tscSqlExprGet(pQueryInfo, 0)->functionId == TSDB_FUNC_LAST_ROW && (joinQuery || twQuery || !groupbyTagsOrNull(pQueryInfo))) {
|
||||
return false;
|
||||
}
|
||||
int32_t factor = INT32_MAX;
|
||||
|
||||
// diff function cannot be executed with other function
|
||||
// arithmetic function can be executed with other arithmetic functions
|
||||
size_t size = tscSqlExprNumOfExprs(pQueryInfo);
|
||||
|
||||
for (int32_t i = startIdx + 1; i < size; ++i) {
|
||||
for (int32_t i = startIdx; i < size; ++i) {
|
||||
SSqlExpr* pExpr1 = tscSqlExprGet(pQueryInfo, i);
|
||||
|
||||
int16_t functionId = pExpr1->functionId;
|
||||
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS) {
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
|
||||
pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE ? ++aggUdf : ++scalarUdf;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) {
|
||||
++prjNum;
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_PRJ) {
|
||||
++prjNum;
|
||||
}
|
||||
|
||||
if (functionId == TSDB_FUNC_PRJ && (pExpr1->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX || TSDB_COL_IS_UD_COL(pExpr1->colInfo.flag))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionCompatList[functionId] != factor) {
|
||||
return false;
|
||||
if (factor == INT32_MAX) {
|
||||
factor = functionCompatList[functionId];
|
||||
} else {
|
||||
if (factor == -1) { // two functions with the same -1 flag
|
||||
if (functionCompatList[functionId] != factor) {
|
||||
return false;
|
||||
} else {
|
||||
if (factor == -1) { // two functions with the same -1 flag
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3180,6 +3195,18 @@ static bool functionCompatibleCheck(SQueryInfo* pQueryInfo, bool joinQuery, bool
|
|||
}
|
||||
}
|
||||
|
||||
aggNum = size - prjNum - aggUdf - scalarUdf;
|
||||
|
||||
assert(aggNum >= 0);
|
||||
|
||||
if (aggUdf > 0 && (prjNum > 0 || aggNum > 0 || scalarUdf > 0)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (scalarUdf > 0 && aggNum > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -5276,7 +5303,7 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
|
||||
int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||
bool isProjectionFunction = false;
|
||||
const char* msg1 = "column projection is not compatible with interval";
|
||||
const char* msg1 = "column projection or function not compatible with interval";
|
||||
|
||||
// multi-output set/ todo refactor
|
||||
size_t size = taosArrayGetSize(pQueryInfo->exprList);
|
||||
|
@ -5284,6 +5311,16 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
|
|||
for (int32_t k = 0; k < size; ++k) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, k);
|
||||
|
||||
if (pExpr->functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * pExpr->functionId - 1);
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_SCALAR) {
|
||||
isProjectionFunction = true;
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// projection query on primary timestamp, the selectivity function needs to be present.
|
||||
if (pExpr->functionId == TSDB_FUNC_PRJ && pExpr->colInfo.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
bool hasSelectivity = false;
|
||||
|
@ -5303,6 +5340,7 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
|
|||
if ((pExpr->functionId == TSDB_FUNC_PRJ && pExpr->numOfParams == 0) || pExpr->functionId == TSDB_FUNC_DIFF ||
|
||||
pExpr->functionId == TSDB_FUNC_ARITHM) {
|
||||
isProjectionFunction = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5910,6 +5948,15 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
|
|||
continue;
|
||||
}
|
||||
|
||||
if (functionId < 0) {
|
||||
SUdfInfo* pUdfInfo = taosArrayGet(pCmd->pUdfInfo, -1 * functionId - 1);
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
++numOfAggregation;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
|
||||
numOfSelectivity++;
|
||||
} else {
|
||||
|
@ -6129,6 +6176,10 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
if (functId < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
|
||||
functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
|
@ -6896,10 +6947,6 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
|
|||
}
|
||||
|
||||
pQuerySqlNode->pWhere = NULL;
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->window.skey = pQueryInfo->window.skey / 1000;
|
||||
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
|
||||
}
|
||||
} else { // set the time rang
|
||||
if (taosArrayGetSize(pQuerySqlNode->from->tableList) > 1) { // it is a join query, no where clause is not allowed.
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "condition missing for join query ");
|
||||
|
@ -6916,6 +6963,11 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
|
|||
return code;
|
||||
}
|
||||
|
||||
if (tinfo.precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
pQueryInfo->window.skey = pQueryInfo->window.skey / 1000;
|
||||
pQueryInfo->window.ekey = pQueryInfo->window.ekey / 1000;
|
||||
}
|
||||
|
||||
if (parseSelectClause(pCmd, index, pQuerySqlNode->pSelectList, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_SQL;
|
||||
}
|
||||
|
|
|
@ -1076,6 +1076,10 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
STR_TO_VARSTR(pMsg, pUdfInfo->name);
|
||||
|
||||
pMsg += varDataTLen(pMsg);
|
||||
|
||||
*(int32_t*) pMsg = htonl(pUdfInfo->funcType);
|
||||
pMsg += sizeof(pUdfInfo->funcType);
|
||||
|
||||
pQueryMsg->udfContentLen = htonl(pUdfInfo->contLen);
|
||||
memcpy(pMsg, pUdfInfo->content, pUdfInfo->contLen);
|
||||
|
||||
|
@ -1844,8 +1848,8 @@ int tscBuildRetrieveFuncMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pMsg += sizeof(SRetrieveFuncMsg);
|
||||
for(int32_t i = 0; i < numOfFuncs; ++i) {
|
||||
SUdfInfo* pUdf = taosArrayGet(pCmd->pUdfInfo, i);
|
||||
STR_TO_VARSTR(pMsg, pUdf->name);
|
||||
pMsg += varDataTLen(pMsg);
|
||||
STR_TO_NET_VARSTR(pMsg, pUdf->name);
|
||||
pMsg += varDataNetTLen(pMsg);
|
||||
}
|
||||
|
||||
pCmd->msgType = TSDB_MSG_TYPE_CM_RETRIEVE_FUNC;
|
||||
|
@ -2131,9 +2135,13 @@ int tscProcessRetrieveFuncRsp(SSqlObj* pSql) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pUdfInfo->content) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pUdfInfo->resBytes = htons(pFunc->resBytes);
|
||||
pUdfInfo->resType = pFunc->resType;
|
||||
pUdfInfo->funcType = TSDB_UDF_TYPE_SCALAR;
|
||||
pUdfInfo->funcType = htonl(pFunc->funcType);
|
||||
pUdfInfo->contLen = htonl(pFunc->len);
|
||||
|
||||
pUdfInfo->content = malloc(pUdfInfo->contLen);
|
||||
|
@ -2790,4 +2798,3 @@ void tscInitMsgsFp() {
|
|||
tscKeepConn[TSDB_SQL_FETCH] = 1;
|
||||
tscKeepConn[TSDB_SQL_HB] = 1;
|
||||
}
|
||||
|