[TD-4270]<feature>: the first round rest udf implementation
This commit is contained in:
parent
5fd5e775a6
commit
0d738dc6f8
|
@ -17,7 +17,7 @@
|
|||
#define HTTP_PARSER_H
|
||||
#include "httpGzip.h"
|
||||
|
||||
#define HTTP_MAX_URL 5 // http url stack size
|
||||
#define HTTP_MAX_URL 6 // http url stack size
|
||||
|
||||
typedef enum HTTP_PARSER_STATE {
|
||||
HTTP_PARSER_BEGIN,
|
||||
|
|
|
@ -119,6 +119,90 @@ bool restProcessSqlRequest(HttpContext* pContext, int32_t timestampFmt) {
|
|||
return true;
|
||||
}
|
||||
|
||||
#define REST_FUNC_URL_POS 2
|
||||
#define REST_OUTP_URL_POS 3
|
||||
#define REST_AGGR_URL_POS 4
|
||||
#define REST_BUFF_URL_POS 5
|
||||
|
||||
#define HTTP_FUNC_LEN 32
|
||||
#define HTTP_OUTP_LEN 16
|
||||
#define HTTP_AGGR_LEN 2
|
||||
#define HTTP_BUFF_LEN 32
|
||||
|
||||
static int udfSaveFile(const char *fname, const char *content, long len) {
|
||||
int fd = open(fname, O_WRONLY | O_CREAT | O_EXCL | O_BINARY, 0755);
|
||||
if (fd < 0)
|
||||
return -1;
|
||||
if (taosWrite(fd, (void *)content, len) < 0)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool restProcessUdfRequest(HttpContext* pContext) {
|
||||
HttpParser* pParser = pContext->parser;
|
||||
if (pParser->path[REST_FUNC_URL_POS].pos >= HTTP_FUNC_LEN || pParser->path[REST_FUNC_URL_POS].pos <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pParser->path[REST_OUTP_URL_POS].pos >= HTTP_OUTP_LEN || pParser->path[REST_OUTP_URL_POS].pos <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pParser->path[REST_AGGR_URL_POS].pos >= HTTP_AGGR_LEN || pParser->path[REST_AGGR_URL_POS].pos <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pParser->path[REST_BUFF_URL_POS].pos >= HTTP_BUFF_LEN || pParser->path[REST_BUFF_URL_POS].pos <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
char* sql = pContext->parser->body.str;
|
||||
int len = pContext->parser->body.size;
|
||||
if (sql == NULL) {
|
||||
httpSendErrorResp(pContext, TSDB_CODE_HTTP_NO_SQL_INPUT);
|
||||
return false;
|
||||
}
|
||||
|
||||
char udfDir[256] = {0};
|
||||
char buf[64] = "udf-";
|
||||
char funcName[64] = {0};
|
||||
int aggr = 0;
|
||||
char outputType[16] = {0};
|
||||
char buffSize[32] = {0};
|
||||
|
||||
tstrncpy(funcName, pParser->path[REST_FUNC_URL_POS].str, HTTP_FUNC_LEN);
|
||||
tstrncpy(buf + 4, funcName, HTTP_FUNC_LEN);
|
||||
|
||||
if (pParser->path[REST_AGGR_URL_POS].str[0] != '0') {
|
||||
aggr = 1;
|
||||
}
|
||||
|
||||
tstrncpy(outputType, pParser->path[REST_OUTP_URL_POS].str, HTTP_OUTP_LEN);
|
||||
tstrncpy(buffSize, pParser->path[REST_BUFF_URL_POS].str, HTTP_BUFF_LEN);
|
||||
|
||||
taosGetTmpfilePath(funcName, udfDir);
|
||||
|
||||
udfSaveFile(udfDir, sql, len);
|
||||
|
||||
tfree(sql);
|
||||
pContext->parser->body.str = malloc(1024);
|
||||
sql = pContext->parser->body.str;
|
||||
int sqlLen = sprintf(sql, "create %s function %s as \"%s\" outputtype %s bufsize %s",
|
||||
aggr == 1 ? "aggregate" : " ", funcName, udfDir, outputType, buffSize);
|
||||
|
||||
pContext->parser->body.pos = sqlLen;
|
||||
pContext->parser->body.size = sqlLen + 1;
|
||||
|
||||
HttpSqlCmd* cmd = &(pContext->singleCmd);
|
||||
cmd->nativSql = sql;
|
||||
|
||||
pContext->reqType = HTTP_REQTYPE_SINGLE_SQL;
|
||||
pContext->encodeMethod = &restEncodeSqlLocalTimeStringMethod;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool restProcessRequest(struct HttpContext* pContext) {
|
||||
if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) {
|
||||
restGetUserFromUrl(pContext);
|
||||
|
@ -138,6 +222,8 @@ bool restProcessRequest(struct HttpContext* pContext) {
|
|||
return restProcessSqlRequest(pContext, REST_TIMESTAMP_FMT_UTC_STRING);
|
||||
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "login")) {
|
||||
return restProcessLoginRequest(pContext);
|
||||
} else if (httpUrlMatch(pContext, REST_ACTION_URL_POS, "udf")) {
|
||||
return restProcessUdfRequest(pContext);
|
||||
} else {
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue