support udf

This commit is contained in:
dapan1121 2021-03-12 10:21:54 +08:00
parent a1f747a89e
commit 03a6ba6570
6 changed files with 121 additions and 1 deletions

View File

@ -241,6 +241,80 @@ static int32_t handlePassword(SSqlCmd* pCmd, SStrToken* pPwd) {
return TSDB_CODE_SUCCESS;
}
int32_t readFromFile(char *name, uint32_t *len, void **buf) {
struct stat fileStat;
if (stat(name, &fileStat) < 0) {
tscError("stat file %s failed, error:%s", name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
*len = fileStat.st_size;
*buf = calloc(1, *len);
if (*buf == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int fd = open(name, O_RDONLY);
if (fd < 0) {
tscError("open file %s failed, error:%s", name, strerror(errno));
tfree(*buf);
return TAOS_SYSTEM_ERROR(errno);
}
int64_t s = taosReadImp(fd, *buf, *len);
if (s != *len) {
tscError("read file %s failed, error:%s", name, strerror(errno));
close(fd);
tfree(*buf);
return TSDB_CODE_TSC_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t handleCreateFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
switch (pInfo->type) {
case TSDB_SQL_CREATE_FUNCTION:
SCreateFuncInfo *createInfo = &pInfo->pMiscInfo->funcOpt;
SCreateFuncMsg *pMsg = (SCreateFuncMsg *)pSql->cmd.payload;
int32_t len = 0;
void *buf = NULL;
createInfo->path.z[createInfo->path.n] = 0;
strdequote(createInfo->path.z);
int32_t ret = readFromFile(createInfo->path.z, &len, &buf);
if (ret) {
return ret;
}
//TODO CHECK CODE
if (len + sizeof(SCreateFuncMsg) > pSql->cmd.allocSize) {
ret = tscAllocPayload(&pSql->cmd, len + sizeof(SCreateFuncMsg));
if (ret) {
return ret;
}
}
pMsg->codeLen = htonl(len);
memcpy(pMsg->code, *buf, len);
break;
case TSDB_SQL_DROP_FUNCTION:
default:
return TSDB_CODE_TSC_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (pInfo == NULL || pSql == NULL) {
return TSDB_CODE_TSC_APP_ERROR;
@ -352,6 +426,15 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
break;
}
case TSDB_SQL_CREATE_FUNCTION:
case TSDB_SQL_DROP_FUNCTION: {
if (handleCreateFunc(pSql, pInfo) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
break;
}
case TSDB_SQL_ALTER_DB:
case TSDB_SQL_CREATE_DB: {
const char* msg1 = "invalid db name";

View File

@ -41,8 +41,10 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_ACCT, "create-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_USER, "create-user" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_ACCT, "drop-acct" )

View File

@ -148,6 +148,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_STREAMTABLES,
TSDB_MGMT_TABLE_CLUSTER,
TSDB_MGMT_TABLE_TP,
TSDB_MGMT_TABLE_FUNCTION,
TSDB_MGMT_TABLE_MAX,
};
@ -567,6 +568,11 @@ typedef struct {
int8_t reserve[5];
} SCreateDbMsg, SAlterDbMsg;
typedef struct {
int32_t codeLen;
char code[];
} SCreateFuncMsg;
typedef struct {
char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists;

View File

@ -129,6 +129,12 @@ typedef struct SCreateDbInfo {
int16_t partitions;
} SCreateDbInfo;
typedef struct SCreateFuncInfo {
SStrToken name;
SStrToken path;
} SCreateFuncInfo;
typedef struct SCreateAcctInfo {
int32_t maxUsers;
int32_t maxDbs;
@ -163,6 +169,7 @@ typedef struct SMiscInfo {
union {
SCreateDbInfo dbOpt;
SCreateAcctInfo acctOpt;
SCreateFuncInfo funcOpt;
SShowInfo showOpt;
SStrToken id;
};

View File

@ -837,4 +837,3 @@ cmd ::= KILL QUERY INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); s
COUNT SUM AVG MIN MAX FIRST LAST TOP BOTTOM STDDEV PERCENTILE APERCENTILE LEASTSQUARES HISTOGRAM DIFF
SPREAD TWA INTERP LAST_ROW RATE IRATE SUM_RATE SUM_IRATE AVG_RATE AVG_IRATE TBID NOW IPTOKEN SEMI NONE PREV LINEAR IMPORT
METRIC TBNAME JOIN METRICS STABLE NULL INSERT INTO VALUES.

View File

@ -820,6 +820,18 @@ void setDropDbTableInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken, SStrTo
pInfo->pMiscInfo->tableType = tableType;
}
void setDropFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken* pToken) {
pInfo->type = type;
if (pInfo->pMiscInfo == NULL) {
pInfo->pMiscInfo = (SMiscInfo *)calloc(1, sizeof(SMiscInfo));
pInfo->pMiscInfo->a = taosArrayInit(4, sizeof(SStrToken));
}
taosArrayPush(pInfo->pMiscInfo->a, pToken);
}
void setShowOptions(SSqlInfo *pInfo, int32_t type, SStrToken* prefix, SStrToken* pPatterns) {
if (pInfo->pMiscInfo == NULL) {
pInfo->pMiscInfo = calloc(1, sizeof(SMiscInfo));
@ -854,6 +866,17 @@ void setCreateDbInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pToken, SCreateDb
pInfo->pMiscInfo->dbOpt.ignoreExists = pIgExists->n; // sql.y has: ifnotexists(X) ::= IF NOT EXISTS. {X.n = 1;}
}
void setCreateFuncInfo(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken *pPath) {
pInfo->type = type;
if (pInfo->pMiscInfo == NULL) {
pInfo->pMiscInfo = calloc(1, sizeof(SMiscInfo));
}
pInfo->pMiscInfo->funcOpt.name = *pName;
pInfo->pMiscInfo->funcOpt.path = *pPath;
}
void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken *pPwd, SCreateAcctInfo *pAcctInfo) {
pInfo->type = type;
if (pInfo->pMiscInfo == NULL) {