feat: sql command 'create function'
This commit is contained in:
parent
5dea946bd4
commit
6f377189d0
|
@ -658,6 +658,7 @@ typedef struct {
|
||||||
int8_t outputType;
|
int8_t outputType;
|
||||||
int32_t outputLen;
|
int32_t outputLen;
|
||||||
int32_t bufSize;
|
int32_t bufSize;
|
||||||
|
int32_t codeLen;
|
||||||
int64_t signature;
|
int64_t signature;
|
||||||
char* pComment;
|
char* pComment;
|
||||||
char* pCode;
|
char* pCode;
|
||||||
|
|
|
@ -148,94 +148,95 @@
|
||||||
#define TK_VARIABLES 130
|
#define TK_VARIABLES 130
|
||||||
#define TK_BNODES 131
|
#define TK_BNODES 131
|
||||||
#define TK_SNODES 132
|
#define TK_SNODES 132
|
||||||
#define TK_LIKE 133
|
#define TK_CLUSTER 133
|
||||||
#define TK_INDEX 134
|
#define TK_LIKE 134
|
||||||
#define TK_FULLTEXT 135
|
#define TK_INDEX 135
|
||||||
#define TK_FUNCTION 136
|
#define TK_FULLTEXT 136
|
||||||
#define TK_INTERVAL 137
|
#define TK_FUNCTION 137
|
||||||
#define TK_TOPIC 138
|
#define TK_INTERVAL 138
|
||||||
#define TK_AS 139
|
#define TK_TOPIC 139
|
||||||
#define TK_DESC 140
|
#define TK_AS 140
|
||||||
#define TK_DESCRIBE 141
|
#define TK_DESC 141
|
||||||
#define TK_RESET 142
|
#define TK_DESCRIBE 142
|
||||||
#define TK_QUERY 143
|
#define TK_RESET 143
|
||||||
#define TK_EXPLAIN 144
|
#define TK_QUERY 144
|
||||||
#define TK_ANALYZE 145
|
#define TK_EXPLAIN 145
|
||||||
#define TK_VERBOSE 146
|
#define TK_ANALYZE 146
|
||||||
#define TK_NK_BOOL 147
|
#define TK_VERBOSE 147
|
||||||
#define TK_RATIO 148
|
#define TK_NK_BOOL 148
|
||||||
#define TK_COMPACT 149
|
#define TK_RATIO 149
|
||||||
#define TK_VNODES 150
|
#define TK_COMPACT 150
|
||||||
#define TK_IN 151
|
#define TK_VNODES 151
|
||||||
#define TK_OUTPUTTYPE 152
|
#define TK_IN 152
|
||||||
#define TK_AGGREGATE 153
|
#define TK_OUTPUTTYPE 153
|
||||||
#define TK_BUFSIZE 154
|
#define TK_AGGREGATE 154
|
||||||
#define TK_STREAM 155
|
#define TK_BUFSIZE 155
|
||||||
#define TK_INTO 156
|
#define TK_STREAM 156
|
||||||
#define TK_TRIGGER 157
|
#define TK_INTO 157
|
||||||
#define TK_AT_ONCE 158
|
#define TK_TRIGGER 158
|
||||||
#define TK_WINDOW_CLOSE 159
|
#define TK_AT_ONCE 159
|
||||||
#define TK_WATERMARK 160
|
#define TK_WINDOW_CLOSE 160
|
||||||
#define TK_KILL 161
|
#define TK_WATERMARK 161
|
||||||
#define TK_CONNECTION 162
|
#define TK_KILL 162
|
||||||
#define TK_MERGE 163
|
#define TK_CONNECTION 163
|
||||||
#define TK_VGROUP 164
|
#define TK_MERGE 164
|
||||||
#define TK_REDISTRIBUTE 165
|
#define TK_VGROUP 165
|
||||||
#define TK_SPLIT 166
|
#define TK_REDISTRIBUTE 166
|
||||||
#define TK_SYNCDB 167
|
#define TK_SPLIT 167
|
||||||
#define TK_NULL 168
|
#define TK_SYNCDB 168
|
||||||
#define TK_NK_QUESTION 169
|
#define TK_NULL 169
|
||||||
#define TK_NK_ARROW 170
|
#define TK_NK_QUESTION 170
|
||||||
#define TK_ROWTS 171
|
#define TK_NK_ARROW 171
|
||||||
#define TK_TBNAME 172
|
#define TK_ROWTS 172
|
||||||
#define TK_QSTARTTS 173
|
#define TK_TBNAME 173
|
||||||
#define TK_QENDTS 174
|
#define TK_QSTARTTS 174
|
||||||
#define TK_WSTARTTS 175
|
#define TK_QENDTS 175
|
||||||
#define TK_WENDTS 176
|
#define TK_WSTARTTS 176
|
||||||
#define TK_WDURATION 177
|
#define TK_WENDTS 177
|
||||||
#define TK_CAST 178
|
#define TK_WDURATION 178
|
||||||
#define TK_NOW 179
|
#define TK_CAST 179
|
||||||
#define TK_TODAY 180
|
#define TK_NOW 180
|
||||||
#define TK_TIMEZONE 181
|
#define TK_TODAY 181
|
||||||
#define TK_COUNT 182
|
#define TK_TIMEZONE 182
|
||||||
#define TK_FIRST 183
|
#define TK_COUNT 183
|
||||||
#define TK_LAST 184
|
#define TK_FIRST 184
|
||||||
#define TK_LAST_ROW 185
|
#define TK_LAST 185
|
||||||
#define TK_BETWEEN 186
|
#define TK_LAST_ROW 186
|
||||||
#define TK_IS 187
|
#define TK_BETWEEN 187
|
||||||
#define TK_NK_LT 188
|
#define TK_IS 188
|
||||||
#define TK_NK_GT 189
|
#define TK_NK_LT 189
|
||||||
#define TK_NK_LE 190
|
#define TK_NK_GT 190
|
||||||
#define TK_NK_GE 191
|
#define TK_NK_LE 191
|
||||||
#define TK_NK_NE 192
|
#define TK_NK_GE 192
|
||||||
#define TK_MATCH 193
|
#define TK_NK_NE 193
|
||||||
#define TK_NMATCH 194
|
#define TK_MATCH 194
|
||||||
#define TK_CONTAINS 195
|
#define TK_NMATCH 195
|
||||||
#define TK_JOIN 196
|
#define TK_CONTAINS 196
|
||||||
#define TK_INNER 197
|
#define TK_JOIN 197
|
||||||
#define TK_SELECT 198
|
#define TK_INNER 198
|
||||||
#define TK_DISTINCT 199
|
#define TK_SELECT 199
|
||||||
#define TK_WHERE 200
|
#define TK_DISTINCT 200
|
||||||
#define TK_PARTITION 201
|
#define TK_WHERE 201
|
||||||
#define TK_BY 202
|
#define TK_PARTITION 202
|
||||||
#define TK_SESSION 203
|
#define TK_BY 203
|
||||||
#define TK_STATE_WINDOW 204
|
#define TK_SESSION 204
|
||||||
#define TK_SLIDING 205
|
#define TK_STATE_WINDOW 205
|
||||||
#define TK_FILL 206
|
#define TK_SLIDING 206
|
||||||
#define TK_VALUE 207
|
#define TK_FILL 207
|
||||||
#define TK_NONE 208
|
#define TK_VALUE 208
|
||||||
#define TK_PREV 209
|
#define TK_NONE 209
|
||||||
#define TK_LINEAR 210
|
#define TK_PREV 210
|
||||||
#define TK_NEXT 211
|
#define TK_LINEAR 211
|
||||||
#define TK_GROUP 212
|
#define TK_NEXT 212
|
||||||
#define TK_HAVING 213
|
#define TK_GROUP 213
|
||||||
#define TK_ORDER 214
|
#define TK_HAVING 214
|
||||||
#define TK_SLIMIT 215
|
#define TK_ORDER 215
|
||||||
#define TK_SOFFSET 216
|
#define TK_SLIMIT 216
|
||||||
#define TK_LIMIT 217
|
#define TK_SOFFSET 217
|
||||||
#define TK_OFFSET 218
|
#define TK_LIMIT 218
|
||||||
#define TK_ASC 219
|
#define TK_OFFSET 219
|
||||||
#define TK_NULLS 220
|
#define TK_ASC 220
|
||||||
|
#define TK_NULLS 221
|
||||||
|
|
||||||
#define TK_NK_SPACE 300
|
#define TK_NK_SPACE 300
|
||||||
#define TK_NK_COMMENT 301
|
#define TK_NK_COMMENT 301
|
||||||
|
|
|
@ -110,7 +110,10 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_QENDTS,
|
FUNCTION_TYPE_QENDTS,
|
||||||
FUNCTION_TYPE_WSTARTTS,
|
FUNCTION_TYPE_WSTARTTS,
|
||||||
FUNCTION_TYPE_WENDTS,
|
FUNCTION_TYPE_WENDTS,
|
||||||
FUNCTION_TYPE_WDURATION
|
FUNCTION_TYPE_WDURATION,
|
||||||
|
|
||||||
|
// user defined funcion
|
||||||
|
FUNCTION_TYPE_UDF = 10000
|
||||||
} EFunctionType;
|
} EFunctionType;
|
||||||
|
|
||||||
struct SqlFunctionCtx;
|
struct SqlFunctionCtx;
|
||||||
|
@ -138,6 +141,7 @@ bool fmIsWindowClauseFunc(int32_t funcId);
|
||||||
bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
||||||
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
|
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
|
||||||
bool fmIsMultiResFunc(int32_t funcId);
|
bool fmIsMultiResFunc(int32_t funcId);
|
||||||
|
bool fmIsUserDefinedFunc(int32_t funcId);
|
||||||
|
|
||||||
typedef enum EFuncDataRequired {
|
typedef enum EFuncDataRequired {
|
||||||
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||||
|
|
|
@ -295,6 +295,16 @@ typedef struct SDropStreamStmt {
|
||||||
bool ignoreNotExists;
|
bool ignoreNotExists;
|
||||||
} SDropStreamStmt;
|
} SDropStreamStmt;
|
||||||
|
|
||||||
|
typedef struct SCreateFunctionStmt {
|
||||||
|
ENodeType type;
|
||||||
|
bool ignoreExists;
|
||||||
|
char funcName[TSDB_FUNC_NAME_LEN];
|
||||||
|
bool isAgg;
|
||||||
|
char libraryPath[PATH_MAX];
|
||||||
|
SDataType outputDt;
|
||||||
|
int32_t bufSize;
|
||||||
|
} SCreateFunctionStmt;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -134,6 +134,7 @@ typedef enum ENodeType {
|
||||||
QUERY_NODE_SHOW_QNODES_STMT,
|
QUERY_NODE_SHOW_QNODES_STMT,
|
||||||
QUERY_NODE_SHOW_SNODES_STMT,
|
QUERY_NODE_SHOW_SNODES_STMT,
|
||||||
QUERY_NODE_SHOW_BNODES_STMT,
|
QUERY_NODE_SHOW_BNODES_STMT,
|
||||||
|
QUERY_NODE_SHOW_CLUSTER_STMT,
|
||||||
QUERY_NODE_SHOW_DATABASES_STMT,
|
QUERY_NODE_SHOW_DATABASES_STMT,
|
||||||
QUERY_NODE_SHOW_FUNCTIONS_STMT,
|
QUERY_NODE_SHOW_FUNCTIONS_STMT,
|
||||||
QUERY_NODE_SHOW_INDEXES_STMT,
|
QUERY_NODE_SHOW_INDEXES_STMT,
|
||||||
|
|
|
@ -239,6 +239,8 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_FUNC_BUF_SIZE 512
|
#define TSDB_FUNC_BUF_SIZE 512
|
||||||
#define TSDB_FUNC_TYPE_SCALAR 1
|
#define TSDB_FUNC_TYPE_SCALAR 1
|
||||||
#define TSDB_FUNC_TYPE_AGGREGATE 2
|
#define TSDB_FUNC_TYPE_AGGREGATE 2
|
||||||
|
#define TSDB_FUNC_SCRIPT_BIN_LIB 0
|
||||||
|
#define TSDB_FUNC_SCRIPT_LUA 1
|
||||||
#define TSDB_FUNC_MAX_RETRIEVE 1024
|
#define TSDB_FUNC_MAX_RETRIEVE 1024
|
||||||
|
|
||||||
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
|
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
|
||||||
|
|
|
@ -1511,6 +1511,7 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
|
||||||
if (tEncodeI8(&encoder, pReq->outputType) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->outputType) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->outputLen) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->outputLen) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->bufSize) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->bufSize) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->codeLen) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->signature) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->signature) < 0) return -1;
|
||||||
|
|
||||||
int32_t codeSize = 0;
|
int32_t codeSize = 0;
|
||||||
|
@ -1550,6 +1551,7 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
|
||||||
if (tDecodeI8(&decoder, &pReq->outputType) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->outputType) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->outputLen) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->outputLen) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->bufSize) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->bufSize) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->codeLen) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->signature) < 0) return -1;
|
||||||
|
|
||||||
int32_t codeSize = 0;
|
int32_t codeSize = 0;
|
||||||
|
|
|
@ -77,7 +77,9 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
|
||||||
SDB_SET_INT64(pRaw, dataPos, pFunc->signature, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pFunc->signature, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, _OVER)
|
||||||
|
if (pFunc->commentSize > 0) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
||||||
|
}
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
|
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
|
||||||
|
@ -125,13 +127,18 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, _OVER)
|
||||||
|
|
||||||
|
if (pFunc->commentSize > 0) {
|
||||||
pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize);
|
pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize);
|
||||||
pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize);
|
if (pFunc->pComment == NULL) {
|
||||||
if (pFunc->pComment == NULL || pFunc->pCode == NULL) {
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize);
|
||||||
|
if (pFunc->pCode == NULL) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
|
SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
|
@ -192,16 +199,20 @@ static int32_t mndCreateFunc(SMnode *pMnode, SNodeMsg *pReq, SCreateFuncReq *pCr
|
||||||
func.outputLen = pCreate->outputLen;
|
func.outputLen = pCreate->outputLen;
|
||||||
func.bufSize = pCreate->bufSize;
|
func.bufSize = pCreate->bufSize;
|
||||||
func.signature = pCreate->signature;
|
func.signature = pCreate->signature;
|
||||||
|
if (NULL != pCreate->pComment) {
|
||||||
func.commentSize = strlen(pCreate->pComment) + 1;
|
func.commentSize = strlen(pCreate->pComment) + 1;
|
||||||
func.codeSize = strlen(pCreate->pCode) + 1;
|
|
||||||
func.pComment = taosMemoryMalloc(func.commentSize);
|
func.pComment = taosMemoryMalloc(func.commentSize);
|
||||||
|
}
|
||||||
|
func.codeSize = pCreate->codeLen;
|
||||||
func.pCode = taosMemoryMalloc(func.codeSize);
|
func.pCode = taosMemoryMalloc(func.codeSize);
|
||||||
if (func.pCode == NULL || func.pCode == NULL) {
|
if (func.pCode == NULL || func.pCode == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (func.commentSize > 0) {
|
||||||
memcpy(func.pComment, pCreate->pComment, func.commentSize);
|
memcpy(func.pComment, pCreate->pComment, func.commentSize);
|
||||||
|
}
|
||||||
memcpy(func.pCode, pCreate->pCode, func.codeSize);
|
memcpy(func.pCode, pCreate->pCode, func.codeSize);
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg);
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg);
|
||||||
|
@ -293,16 +304,6 @@ static int32_t mndProcessCreateFuncReq(SNodeMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (createReq.pComment == NULL) {
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (createReq.pComment[0] == 0) {
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_FUNC_COMMENT;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (createReq.pCode == NULL) {
|
if (createReq.pCode == NULL) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
|
terrno = TSDB_CODE_MND_INVALID_FUNC_CODE;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -441,13 +442,19 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) {
|
||||||
funcInfo.commentSize = pFunc->commentSize;
|
funcInfo.commentSize = pFunc->commentSize;
|
||||||
funcInfo.codeSize = pFunc->codeSize;
|
funcInfo.codeSize = pFunc->codeSize;
|
||||||
funcInfo.pCode = taosMemoryCalloc(1, sizeof(funcInfo.codeSize));
|
funcInfo.pCode = taosMemoryCalloc(1, sizeof(funcInfo.codeSize));
|
||||||
|
if (funcInfo.pCode == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto RETRIEVE_FUNC_OVER;
|
||||||
|
}
|
||||||
|
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
|
||||||
|
if (funcInfo.commentSize > 0) {
|
||||||
funcInfo.pComment = taosMemoryCalloc(1, sizeof(funcInfo.commentSize));
|
funcInfo.pComment = taosMemoryCalloc(1, sizeof(funcInfo.commentSize));
|
||||||
if (funcInfo.pCode == NULL || funcInfo.pComment == NULL) {
|
if (funcInfo.pComment == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto RETRIEVE_FUNC_OVER;
|
goto RETRIEVE_FUNC_OVER;
|
||||||
}
|
}
|
||||||
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
|
memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
|
||||||
memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
|
}
|
||||||
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
|
taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo);
|
||||||
mndReleaseFunc(pMnode, pFunc);
|
mndReleaseFunc(pMnode, pFunc);
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ target_include_directories(
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
function
|
function
|
||||||
PRIVATE os util common nodes scalar
|
PRIVATE os util common nodes scalar catalog qcom transport
|
||||||
PUBLIC uv_a
|
PUBLIC uv_a
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -20,26 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "functionMgt.h"
|
#include "functionMgtInt.h"
|
||||||
|
|
||||||
#define FUNCTION_NAME_MAX_LENGTH 32
|
|
||||||
|
|
||||||
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n) (1 << n)
|
|
||||||
|
|
||||||
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
|
|
||||||
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
|
|
||||||
#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
|
|
||||||
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
|
|
||||||
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
|
|
||||||
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
|
|
||||||
#define FUNC_MGT_TIMEORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(6)
|
|
||||||
#define FUNC_MGT_PSEUDO_COLUMN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(7)
|
|
||||||
#define FUNC_MGT_WINDOW_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(8)
|
|
||||||
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
|
|
||||||
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
|
|
||||||
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
|
|
||||||
|
|
||||||
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
|
||||||
|
|
||||||
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
|
typedef int32_t (*FTranslateFunc)(SFunctionNode* pFunc, char* pErrBuf, int32_t len);
|
||||||
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
typedef EFuncDataRequired (*FFuncDataRequired)(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||||
|
|
|
@ -21,9 +21,29 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
// #include "builtins.h"
|
|
||||||
|
|
||||||
|
#define FUNCTION_NAME_MAX_LENGTH 32
|
||||||
|
|
||||||
|
#define FUNC_MGT_FUNC_CLASSIFICATION_MASK(n) (1 << n)
|
||||||
|
|
||||||
|
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
|
||||||
|
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
|
||||||
|
#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
|
||||||
|
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
|
||||||
|
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
|
||||||
|
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
|
||||||
|
#define FUNC_MGT_TIMEORDER_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(6)
|
||||||
|
#define FUNC_MGT_PSEUDO_COLUMN_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(7)
|
||||||
|
#define FUNC_MGT_WINDOW_PC_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(8)
|
||||||
|
#define FUNC_MGT_SPECIAL_DATA_REQUIRED FUNC_MGT_FUNC_CLASSIFICATION_MASK(9)
|
||||||
|
#define FUNC_MGT_DYNAMIC_SCAN_OPTIMIZED FUNC_MGT_FUNC_CLASSIFICATION_MASK(10)
|
||||||
|
#define FUNC_MGT_MULTI_RES_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(11)
|
||||||
|
|
||||||
|
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||||
|
|
||||||
|
#define FUNC_UDF_ID_START_OFFSET_VAL 5000
|
||||||
|
|
||||||
|
extern const int funcMgtUdfNum;
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,16 +20,22 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "builtins.h"
|
#include "builtins.h"
|
||||||
|
#include "catalog.h"
|
||||||
|
|
||||||
typedef struct SFuncMgtService {
|
typedef struct SFuncMgtService {
|
||||||
SHashObj* pFuncNameHashTable;
|
SHashObj* pFuncNameHashTable;
|
||||||
|
SArray* pUdfTable; // SUdfInfo
|
||||||
} SFuncMgtService;
|
} SFuncMgtService;
|
||||||
|
|
||||||
|
typedef struct SUdfInfo {
|
||||||
|
SDataType outputDt;
|
||||||
|
} SUdfInfo;
|
||||||
|
|
||||||
static SFuncMgtService gFunMgtService;
|
static SFuncMgtService gFunMgtService;
|
||||||
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT;
|
||||||
static int32_t initFunctionCode = 0;
|
static int32_t initFunctionCode = 0;
|
||||||
|
|
||||||
static void doInitFunctionHashTable() {
|
static void doInitFunctionTable() {
|
||||||
gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
gFunMgtService.pFuncNameHashTable = taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
if (NULL == gFunMgtService.pFuncNameHashTable) {
|
if (NULL == gFunMgtService.pFuncNameHashTable) {
|
||||||
initFunctionCode = TSDB_CODE_FAILED;
|
initFunctionCode = TSDB_CODE_FAILED;
|
||||||
|
@ -42,6 +48,8 @@ static void doInitFunctionHashTable() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gFunMgtService.pUdfTable = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
||||||
|
@ -51,25 +59,56 @@ static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) {
|
||||||
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
|
return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getUdfId(const char* pFuncName) {
|
||||||
|
// todo: udf by call catalog
|
||||||
|
if (1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (NULL == gFunMgtService.pUdfTable) {
|
||||||
|
gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo));
|
||||||
|
}
|
||||||
|
SUdfInfo info = {0}; //todo
|
||||||
|
taosArrayPush(gFunMgtService.pUdfTable, &info);
|
||||||
|
return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getFuncId(const char* pFuncName) {
|
||||||
|
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
|
||||||
|
if (NULL == pVal) {
|
||||||
|
return getUdfId(pFuncName);
|
||||||
|
}
|
||||||
|
return *(int32_t*)pVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getUdfResultType(SFunctionNode* pFunc) {
|
||||||
|
SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, pFunc->funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1);
|
||||||
|
pFunc->node.resType = pUdf->outputDt;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t fmFuncMgtInit() {
|
int32_t fmFuncMgtInit() {
|
||||||
taosThreadOnce(&functionHashTableInit, doInitFunctionHashTable);
|
taosThreadOnce(&functionHashTableInit, doInitFunctionTable);
|
||||||
return initFunctionCode;
|
return initFunctionCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
|
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) {
|
||||||
void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName));
|
*pFuncId = getFuncId(pFuncName);
|
||||||
if (NULL == pVal) {
|
if (*pFuncId < 0) {
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
*pFuncId = *(int32_t*)pVal;
|
|
||||||
if (*pFuncId < 0 || *pFuncId >= funcMgtBuiltinsNum) {
|
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
if (fmIsUserDefinedFunc(*pFuncId)) {
|
||||||
|
*pFuncType = FUNCTION_TYPE_UDF;
|
||||||
|
} else {
|
||||||
*pFuncType = funcMgtBuiltins[*pFuncId].type;
|
*pFuncType = funcMgtBuiltins[*pFuncId].type;
|
||||||
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (fmIsUserDefinedFunc(pFunc->funcId)) {
|
||||||
|
return getUdfResultType(pFunc);
|
||||||
|
}
|
||||||
|
|
||||||
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
@ -77,7 +116,7 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
}
|
}
|
||||||
|
|
||||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||||
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
}
|
}
|
||||||
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
|
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
|
||||||
|
@ -87,7 +126,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
|
pFpSet->getEnv = funcMgtBuiltins[funcId].getEnvFunc;
|
||||||
|
@ -98,7 +137,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
||||||
if (funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
|
pFpSet->process = funcMgtBuiltins[funcId].sprocessFunc;
|
||||||
|
@ -142,6 +181,10 @@ bool fmIsMultiResFunc(int32_t funcId) {
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC);
|
return isSpecificClassifyFunc(funcId, FUNC_MGT_MULTI_RES_FUNC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool fmIsUserDefinedFunc(int32_t funcId) {
|
||||||
|
return funcId > FUNC_UDF_ID_START_OFFSET_VAL;
|
||||||
|
}
|
||||||
|
|
||||||
void fmFuncMgtDestroy() {
|
void fmFuncMgtDestroy() {
|
||||||
void* m = gFunMgtService.pFuncNameHashTable;
|
void* m = gFunMgtService.pFuncNameHashTable;
|
||||||
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
|
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
|
||||||
|
|
|
@ -149,7 +149,9 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
||||||
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
|
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
|
||||||
return makeNode(type, sizeof(SNode));
|
return makeNode(type, sizeof(SNode));
|
||||||
case QUERY_NODE_COMPACT_STMT:
|
case QUERY_NODE_COMPACT_STMT:
|
||||||
|
break;
|
||||||
case QUERY_NODE_CREATE_FUNCTION_STMT:
|
case QUERY_NODE_CREATE_FUNCTION_STMT:
|
||||||
|
return makeNode(type, sizeof(SCreateFunctionStmt));
|
||||||
case QUERY_NODE_DROP_FUNCTION_STMT:
|
case QUERY_NODE_DROP_FUNCTION_STMT:
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_CREATE_STREAM_STMT:
|
case QUERY_NODE_CREATE_STREAM_STMT:
|
||||||
|
@ -167,6 +169,7 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
||||||
case QUERY_NODE_SHOW_QNODES_STMT:
|
case QUERY_NODE_SHOW_QNODES_STMT:
|
||||||
case QUERY_NODE_SHOW_SNODES_STMT:
|
case QUERY_NODE_SHOW_SNODES_STMT:
|
||||||
case QUERY_NODE_SHOW_BNODES_STMT:
|
case QUERY_NODE_SHOW_BNODES_STMT:
|
||||||
|
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||||
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
|
case QUERY_NODE_SHOW_FUNCTIONS_STMT:
|
||||||
case QUERY_NODE_SHOW_INDEXES_STMT:
|
case QUERY_NODE_SHOW_INDEXES_STMT:
|
||||||
|
|
|
@ -167,7 +167,7 @@ SNode* createExplainStmt(SAstCreateContext* pCxt, bool analyze, SNode* pOptions,
|
||||||
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable);
|
SNode* createDescribeStmt(SAstCreateContext* pCxt, SNode* pRealTable);
|
||||||
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
|
SNode* createResetQueryCacheStmt(SAstCreateContext* pCxt);
|
||||||
SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups);
|
SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups);
|
||||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize);
|
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize);
|
||||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName);
|
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName);
|
||||||
SNode* createStreamOptions(SAstCreateContext* pCxt);
|
SNode* createStreamOptions(SAstCreateContext* pCxt);
|
||||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNode* pQuery);
|
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNode* pQuery);
|
||||||
|
|
|
@ -346,6 +346,7 @@ cmd ::= SHOW TOPICS.
|
||||||
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
|
cmd ::= SHOW VARIABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VARIABLE_STMT, NULL, NULL); }
|
||||||
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT, NULL, NULL); }
|
cmd ::= SHOW BNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_BNODES_STMT, NULL, NULL); }
|
||||||
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT, NULL, NULL); }
|
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT, NULL, NULL); }
|
||||||
|
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT, NULL, NULL); }
|
||||||
|
|
||||||
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
|
db_name_cond_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
|
||||||
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
|
db_name_cond_opt(A) ::= db_name(B) NK_DOT. { A = createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B); }
|
||||||
|
@ -413,8 +414,8 @@ explain_options(A) ::= explain_options(B) RATIO NK_FLOAT(C).
|
||||||
cmd ::= COMPACT VNODES IN NK_LP integer_list(A) NK_RP. { pCxt->pRootNode = createCompactStmt(pCxt, A); }
|
cmd ::= COMPACT VNODES IN NK_LP integer_list(A) NK_RP. { pCxt->pRootNode = createCompactStmt(pCxt, A); }
|
||||||
|
|
||||||
/************************************************ create/drop function ************************************************/
|
/************************************************ create/drop function ************************************************/
|
||||||
cmd ::= CREATE agg_func_opt(A) FUNCTION function_name(B)
|
cmd ::= CREATE agg_func_opt(A) FUNCTION not_exists_opt(F) function_name(B)
|
||||||
AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, A, &B, &C, D, E); }
|
AS NK_STRING(C) OUTPUTTYPE type_name(D) bufsize_opt(E). { pCxt->pRootNode = createCreateFunctionStmt(pCxt, F, A, &B, &C, D, E); }
|
||||||
cmd ::= DROP FUNCTION function_name(A). { pCxt->pRootNode = createDropFunctionStmt(pCxt, &A); }
|
cmd ::= DROP FUNCTION function_name(A). { pCxt->pRootNode = createDropFunctionStmt(pCxt, &A); }
|
||||||
|
|
||||||
%type agg_func_opt { bool }
|
%type agg_func_opt { bool }
|
||||||
|
|
|
@ -1184,10 +1184,21 @@ SNode* createCompactStmt(SAstCreateContext* pCxt, SNodeList* pVgroups) {
|
||||||
return pStmt;
|
return pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize) {
|
SNode* createCreateFunctionStmt(SAstCreateContext* pCxt,
|
||||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT);
|
bool ignoreExists, bool aggFunc, const SToken* pFuncName, const SToken* pLibPath, SDataType dataType, int32_t bufSize) {
|
||||||
|
if (pLibPath->n <= 2) {
|
||||||
|
pCxt->valid = false;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SCreateFunctionStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_FUNCTION_STMT);
|
||||||
CHECK_OUT_OF_MEM(pStmt);
|
CHECK_OUT_OF_MEM(pStmt);
|
||||||
return pStmt;
|
pStmt->ignoreExists = ignoreExists;
|
||||||
|
strncpy(pStmt->funcName, pFuncName->z, pFuncName->n);
|
||||||
|
pStmt->isAgg = aggFunc;
|
||||||
|
strncpy(pStmt->libraryPath, pLibPath->z + 1, pLibPath->n - 2);
|
||||||
|
pStmt->outputDt = dataType;
|
||||||
|
pStmt->bufSize = bufSize;
|
||||||
|
return (SNode*)pStmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName) {
|
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, const SToken* pFuncName) {
|
||||||
|
|
|
@ -52,6 +52,7 @@ static SKeyword keywordTable[] = {
|
||||||
{"CACHE", TK_CACHE},
|
{"CACHE", TK_CACHE},
|
||||||
{"CACHELAST", TK_CACHELAST},
|
{"CACHELAST", TK_CACHELAST},
|
||||||
{"CAST", TK_CAST},
|
{"CAST", TK_CAST},
|
||||||
|
{"CLUSTER", TK_CLUSTER},
|
||||||
{"COLUMN", TK_COLUMN},
|
{"COLUMN", TK_COLUMN},
|
||||||
{"COMMENT", TK_COMMENT},
|
{"COMMENT", TK_COMMENT},
|
||||||
{"COMP", TK_COMP},
|
{"COMP", TK_COMP},
|
||||||
|
@ -248,7 +249,6 @@ static SKeyword keywordTable[] = {
|
||||||
// {"BEFORE", TK_BEFORE},
|
// {"BEFORE", TK_BEFORE},
|
||||||
// {"BEGIN", TK_BEGIN},
|
// {"BEGIN", TK_BEGIN},
|
||||||
// {"CASCADE", TK_CASCADE},
|
// {"CASCADE", TK_CASCADE},
|
||||||
// {"CLUSTER", TK_CLUSTER},
|
|
||||||
// {"CONFLICT", TK_CONFLICT},
|
// {"CONFLICT", TK_CONFLICT},
|
||||||
// {"COPY", TK_COPY},
|
// {"COPY", TK_COPY},
|
||||||
// {"DEFERRED", TK_DEFERRED},
|
// {"DEFERRED", TK_DEFERRED},
|
||||||
|
|
|
@ -2595,6 +2595,55 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t readFromFile(char* pName, int32_t *len, char **buf) {
|
||||||
|
int64_t filesize = 0;
|
||||||
|
if (taosStatFile(pName, &filesize, NULL) < 0) {
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
*len = filesize;
|
||||||
|
|
||||||
|
if (*len <= 0) {
|
||||||
|
return TSDB_CODE_TSC_FILE_EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
*buf = taosMemoryCalloc(1, *len);
|
||||||
|
if (*buf == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
TdFilePtr tfile = taosOpenFile(pName, O_RDONLY | O_BINARY);
|
||||||
|
if (NULL == tfile) {
|
||||||
|
taosMemoryFreeClear(*buf);
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t s = taosReadFile(tfile, *buf, *len);
|
||||||
|
if (s != *len) {
|
||||||
|
taosCloseFile(&tfile);
|
||||||
|
taosMemoryFreeClear(*buf);
|
||||||
|
return TSDB_CODE_TSC_APP_ERROR;
|
||||||
|
}
|
||||||
|
taosCloseFile(&tfile);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionStmt* pStmt) {
|
||||||
|
SCreateFuncReq req = {0};
|
||||||
|
strcpy(req.name, pStmt->funcName);
|
||||||
|
req.igExists = pStmt->ignoreExists;
|
||||||
|
req.funcType = pStmt->isAgg ? TSDB_FUNC_TYPE_AGGREGATE : TSDB_FUNC_TYPE_SCALAR;
|
||||||
|
req.scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
|
||||||
|
req.outputType = pStmt->outputDt.type;
|
||||||
|
req.outputLen = pStmt->outputDt.bytes;
|
||||||
|
req.bufSize = pStmt->bufSize;
|
||||||
|
int32_t code = readFromFile(pStmt->libraryPath, &req.codeLen, &req.pCode);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = buildCmdMsg(pCxt, TDMT_MND_CREATE_FUNC, (FSerializeFunc)tSerializeSCreateFuncReq, &req);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
switch (nodeType(pNode)) {
|
switch (nodeType(pNode)) {
|
||||||
|
@ -2693,6 +2742,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||||
case QUERY_NODE_DROP_STREAM_STMT:
|
case QUERY_NODE_DROP_STREAM_STMT:
|
||||||
code = translateDropStream(pCxt, (SDropStreamStmt*)pNode);
|
code = translateDropStream(pCxt, (SDropStreamStmt*)pNode);
|
||||||
break;
|
break;
|
||||||
|
case QUERY_NODE_CREATE_FUNCTION_STMT:
|
||||||
|
code = translateCreateFunction(pCxt, (SCreateFunctionStmt*)pNode);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2807,6 +2859,7 @@ static const char* getSysDbName(ENodeType type) {
|
||||||
case QUERY_NODE_SHOW_BNODES_STMT:
|
case QUERY_NODE_SHOW_BNODES_STMT:
|
||||||
case QUERY_NODE_SHOW_SNODES_STMT:
|
case QUERY_NODE_SHOW_SNODES_STMT:
|
||||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||||
|
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||||
return TSDB_INFORMATION_SCHEMA_DB;
|
return TSDB_INFORMATION_SCHEMA_DB;
|
||||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||||
|
@ -2849,6 +2902,8 @@ static const char* getSysTableName(ENodeType type) {
|
||||||
return TSDB_INS_TABLE_SNODES;
|
return TSDB_INS_TABLE_SNODES;
|
||||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||||
return TSDB_INS_TABLE_LICENCES;
|
return TSDB_INS_TABLE_LICENCES;
|
||||||
|
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||||
|
return TSDB_INS_TABLE_CLUSTER;
|
||||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||||
return TSDB_PERFS_TABLE_CONNECTIONS;
|
return TSDB_PERFS_TABLE_CONNECTIONS;
|
||||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||||
|
@ -3365,6 +3420,7 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
case QUERY_NODE_SHOW_SNODES_STMT:
|
case QUERY_NODE_SHOW_SNODES_STMT:
|
||||||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||||
|
case QUERY_NODE_SHOW_CLUSTER_STMT:
|
||||||
code = rewriteShow(pCxt, pQuery);
|
code = rewriteShow(pCxt, pQuery);
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue