add lua exaple
This commit is contained in:
parent
5449d8550a
commit
923ba83b9a
|
@ -320,7 +320,7 @@ int32_t handleUserDefinedFunc(SSqlObj* pSql, struct SSqlInfo* pInfo) {
|
|||
if (ret) {
|
||||
return ret;
|
||||
}
|
||||
if (isValidScript(buf)) {
|
||||
if (!isValidScript(buf, len)) {
|
||||
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
|
|
|
@ -46,23 +46,21 @@ typedef struct ScriptCtx {
|
|||
int8_t state;
|
||||
ScriptEnv *pEnv;
|
||||
int8_t isAgg; // agg function or not
|
||||
//void(*callback)(struct ScriptCtx*ctx, char *input, int16_t iType, int16_t iBytes, int32_t numOfInput, int64_t* ts, char* dataOutput,
|
||||
// char *tsOutput, int32_t* numOfOutput, char *interbuf, int16_t oType, int16_t oBytes);
|
||||
|
||||
// init value of udf script
|
||||
int8_t type;
|
||||
union {int64_t i; double d;} initValue;
|
||||
int8_t resType;
|
||||
int16_t resBytes;
|
||||
|
||||
int32_t numOfOutput;
|
||||
int32_t offset;
|
||||
|
||||
} ScriptCtx;
|
||||
|
||||
int taosLoadScriptInit(SUdfInit* pSUdfInit);
|
||||
int taosLoadScriptNormal(char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows,
|
||||
int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput,
|
||||
int16_t oType, int16_t oBytes, SUdfInit *init);
|
||||
int taosLoadScriptFinalize(char *pOutput, int32_t output, SUdfInit *init);
|
||||
int taosLoadScriptDestroy(SUdfInit* pSUdfInit);
|
||||
int taosLoadScriptInit(void *pInit);
|
||||
void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows,
|
||||
int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes);
|
||||
void taosLoadScriptFinalize(void *pInit, char *pOutput, int32_t *output);
|
||||
void taosLoadScriptDestroy(void *pInit);
|
||||
|
||||
typedef struct {
|
||||
SList *scriptEnvs; //
|
||||
|
@ -71,15 +69,11 @@ typedef struct {
|
|||
pthread_mutex_t mutex;
|
||||
} ScriptEnvPool;
|
||||
|
||||
ScriptCtx* createScriptCtx(char *str);
|
||||
ScriptCtx* createScriptCtx(char *str, int8_t resType, int16_t resBytes);
|
||||
void destroyScriptCtx(void *pScriptCtx);
|
||||
|
||||
int32_t scriptEnvPoolInit();
|
||||
void scriptEnvPoolCleanup();
|
||||
bool isValidScript(const char *sript);
|
||||
|
||||
|
||||
//void execUdf(struct ScriptCtx*ctx, char *input, int16_t iType, int16_t iBytes, int32_t numOfInput,
|
||||
// int64_t* ts, char* dataOutput, char *tsOutput, int32_t* numOfOutput, char *interbuf, int16_t oType, int16_t oBytes);
|
||||
bool isValidScript(char *script, int32_t len);
|
||||
|
||||
#endif //TDENGINE_QSCRIPT_H
|
||||
|
|
|
@ -42,13 +42,28 @@ typedef struct SUdfInfo {
|
|||
char *name; // function name
|
||||
void *handle; // handle loaded in mem
|
||||
void *funcs[TSDB_UDF_FUNC_MAX_NUM]; // function ptr
|
||||
|
||||
// for script like lua/javascript only
|
||||
int isScript;
|
||||
void *pScriptCtx;
|
||||
|
||||
SUdfInit init;
|
||||
char *content;
|
||||
char *path;
|
||||
} SUdfInfo;
|
||||
|
||||
//script
|
||||
|
||||
typedef int32_t (*scriptInitFunc)(void *pCtx);
|
||||
typedef void (*scriptNormalFunc)(void *pCtx, char* data, int16_t iType, int16_t iBytes, int32_t numOfRows,
|
||||
int64_t* ts, char* dataOutput, char* tsOutput, int32_t* numOfOutput, int16_t oType, int16_t oBytes);
|
||||
typedef void (*scriptFinalizeFunc)(void *pCtx, char* dataOutput, int32_t* numOfOutput);
|
||||
typedef void (*scriptMergeFunc)(void *pCtx, char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput);
|
||||
typedef void (*scriptDestroyFunc)(void* pCtx);
|
||||
|
||||
// dynamic lib
|
||||
typedef void (*udfNormalFunc)(char* data, int16_t itype, int16_t iBytes, int32_t numOfRows, int64_t* ts, char* dataOutput, char* tsOutput,
|
||||
int32_t* numOfOutput, int16_t oType, int16_t oBytes, SUdfInit* buf);
|
||||
int32_t* numOfOutput, int16_t oType, int16_t oBytes, SUdfInit* buf);
|
||||
typedef int32_t (*udfInitFunc)(SUdfInit* data);
|
||||
typedef void (*udfFinalizeFunc)(char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
|
||||
typedef void (*udfMergeFunc)(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf);
|
||||
|
|
|
@ -805,9 +805,14 @@ static void doInvokeUdf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int
|
|||
|
||||
if (pUdfInfo && pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]) {
|
||||
qDebug("invoke udf function:%s,%p", pUdfInfo->name, pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL]);
|
||||
|
||||
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
|
||||
if (pUdfInfo->isScript) {
|
||||
(*(scriptNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])(pUdfInfo->pScriptCtx,
|
||||
(char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
|
||||
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes);
|
||||
} else {
|
||||
(*(udfNormalFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL])((char *)pCtx->pInput + idx * pCtx->inputType, pCtx->inputType, pCtx->inputBytes, pCtx->size, pCtx->ptsList, pCtx->pOutput,
|
||||
(char *)pCtx->ptsOutputBuf, &output, pCtx->outputType, pCtx->outputBytes, &pUdfInfo->init);
|
||||
}
|
||||
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
pCtx->resultInfo->numOfRes = output;
|
||||
|
@ -3313,7 +3318,11 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
|
|||
int32_t output = 0;
|
||||
|
||||
if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
if (pRuntimeEnv->pUdfInfo->isScript) {
|
||||
(*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output);
|
||||
} else {
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
}
|
||||
}
|
||||
|
||||
// set the output value exist
|
||||
|
@ -3340,7 +3349,11 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
|
|||
int32_t output = 0;
|
||||
|
||||
if (pRuntimeEnv->pUdfInfo && pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE]) {
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
if (pRuntimeEnv->pUdfInfo->isScript) {
|
||||
(*(scriptFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pRuntimeEnv->pUdfInfo->pScriptCtx, pCtx[j].pOutput, &output);
|
||||
} else {
|
||||
(*(udfFinalizeFunc)pRuntimeEnv->pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE])(pCtx[j].pOutput, &output, &pRuntimeEnv->pUdfInfo->init);
|
||||
}
|
||||
}
|
||||
|
||||
// set the output value exist
|
||||
|
@ -6328,7 +6341,12 @@ void destroyUdfInfo(SUdfInfo* pUdfInfo) {
|
|||
}
|
||||
|
||||
if (pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY]) {
|
||||
(*(udfDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(&pUdfInfo->init);
|
||||
if (pUdfInfo->isScript) {
|
||||
(*(scriptDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(pUdfInfo->pScriptCtx);
|
||||
tfree(pUdfInfo->content);
|
||||
}else{
|
||||
(*(udfDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(&pUdfInfo->init);
|
||||
}
|
||||
}
|
||||
|
||||
tfree(pUdfInfo->name);
|
||||
|
@ -6377,18 +6395,22 @@ int32_t initUdfInfo(SUdfInfo* pUdfInfo) {
|
|||
if (pUdfInfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (isValidScript(pUdfInfo->content)) {
|
||||
//refactor(dengyihao)
|
||||
ScriptCtx *pScriptCtx = createScriptCtx(pUdfInfo->content);
|
||||
pUdfInfo->init.script_ctx = pScriptCtx;
|
||||
//qError("script len: %d", pUdfInfo->contLen);
|
||||
if (isValidScript(pUdfInfo->content, pUdfInfo->contLen)) {
|
||||
pUdfInfo->isScript = 1;
|
||||
pUdfInfo->pScriptCtx = createScriptCtx(pUdfInfo->content, pUdfInfo->resType, pUdfInfo->resBytes);
|
||||
if (pUdfInfo->pScriptCtx == NULL) {
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
tfree(pUdfInfo->content);
|
||||
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadScriptInit;
|
||||
if (pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] == NULL
|
||||
|| (*(udfInitFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(&pUdfInfo->init) != TSDB_CODE_SUCCESS) {
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] = taosLoadScriptInit;
|
||||
if (pUdfInfo->funcs[TSDB_UDF_FUNC_INIT] == NULL
|
||||
|| (*(scriptInitFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_INIT])(pUdfInfo->pScriptCtx) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadScriptNormal;
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_NORMAL] = taosLoadScriptNormal;
|
||||
|
||||
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
|
||||
pUdfInfo->funcs[TSDB_UDF_FUNC_FINALIZE] = taosLoadScriptFinalize;
|
||||
|
|
|
@ -22,11 +22,6 @@
|
|||
static ScriptEnvPool *pool = NULL;
|
||||
|
||||
|
||||
typedef int(*ScriptInit)(SUdfInit *init);
|
||||
typedef int(*ScriptNormal)(char *pInput, int8_t iType, int32_t size, int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *output, SUdfInit *init);
|
||||
typedef int(*ScriptFinalize)(char *pOutput, int32_t output, SUdfInit *init);
|
||||
typedef int(*ScriptDestroy)(SUdfInit *init);
|
||||
|
||||
#define USER_FUNC_NAME "funcName"
|
||||
#define USER_FUNC_NAME_LIMIT 48
|
||||
|
||||
|
@ -38,7 +33,7 @@ static void destroyLuaEnv(lua_State *state);
|
|||
|
||||
static void destroyScriptEnv(ScriptEnv *pEnv);
|
||||
|
||||
static void luaValueToTaosType(lua_State *lua, int16_t iType, char *interBuf, int16_t oType, int32_t *numOfOutput, int16_t oBytes);
|
||||
static void luaValueToTaosType(lua_State *lua, char *interBuf, int32_t *numOfOutput, int16_t oType, int16_t oBytes);
|
||||
static void taosValueToLuaType(lua_State *lua, int32_t type, char *val);
|
||||
|
||||
static bool hasBaseFuncDefinedInScript(lua_State *lua, const char *funcPrefix, int32_t len);
|
||||
|
@ -71,10 +66,6 @@ static void luaLoadLibraries(lua_State *lua) {
|
|||
luaLoadLib(lua, LUA_STRLIBNAME, luaopen_string);
|
||||
luaLoadLib(lua, LUA_MATHLIBNAME, luaopen_math);
|
||||
luaLoadLib(lua, LUA_DBLIBNAME, luaopen_debug);
|
||||
//luaLoadLib(lua, "cjson", luaopen_cjson);
|
||||
//luaLoadLib(lua, "struct", luaopen_struct);
|
||||
//luaLoadLib(lua, "cmsgpack", luaopen_cmsgpack);
|
||||
//luaLoadLib(lua, "bit", luaopen_bit);
|
||||
}
|
||||
static void luaRemoveUnsupportedFunctions(lua_State *lua) {
|
||||
lua_pushnil(lua);
|
||||
|
@ -101,13 +92,9 @@ void taosValueToLuaType(lua_State *lua, int32_t type, char *val) {
|
|||
} else if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||
}
|
||||
}
|
||||
int taosLoadScriptInit(SUdfInit* pInit) {
|
||||
if (pInit->script_ctx == NULL) { return -1;}
|
||||
|
||||
pInit->destroyCtxFunc = destroyScriptCtx;
|
||||
|
||||
ScriptCtx *pCtx = pInit->script_ctx;
|
||||
char funcName[MAX_FUNC_NAME + 10] = {0};
|
||||
int taosLoadScriptInit(void* pInit) {
|
||||
ScriptCtx *pCtx = pInit;
|
||||
char funcName[MAX_FUNC_NAME] = {0};
|
||||
sprintf(funcName, "%s_init", pCtx->funcName);
|
||||
|
||||
lua_State* lua = pCtx->pEnv->lua_state;
|
||||
|
@ -115,22 +102,15 @@ int taosLoadScriptInit(SUdfInit* pInit) {
|
|||
if (lua_pcall(lua, 0, -1, 0)) {
|
||||
lua_pop(lua, -1);
|
||||
}
|
||||
if (lua_isnumber(lua, -1)) {
|
||||
pCtx->initValue.d = lua_tonumber(lua, -1);
|
||||
} else if (lua_isboolean(lua, -1)){
|
||||
pCtx->initValue.i = lua_tointeger(lua, -1);
|
||||
} else if (lua_istable(lua, -1)) {
|
||||
// TODO(dengyihao) handle more type
|
||||
}
|
||||
lua_setglobal(lua, "global");
|
||||
return 0;
|
||||
}
|
||||
int taosLoadScriptNormal(char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows,
|
||||
int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput,
|
||||
int16_t oType, int16_t oBytes, SUdfInit *pInit) {
|
||||
ScriptCtx* pCtx = pInit->script_ctx;
|
||||
|
||||
char funcName[MAX_FUNC_NAME + 10] = {0};
|
||||
void taosLoadScriptNormal(void *pInit, char *pInput, int16_t iType, int16_t iBytes, int32_t numOfRows,
|
||||
int64_t *ptsList, char* pOutput, char *ptsOutput, int32_t *numOfOutput, int16_t oType, int16_t oBytes) {
|
||||
ScriptCtx* pCtx = pInit;
|
||||
char funcName[MAX_FUNC_NAME] = {0};
|
||||
sprintf(funcName, "%s_add", pCtx->funcName);
|
||||
|
||||
lua_State* lua = pCtx->pEnv->lua_state;
|
||||
lua_getglobal(lua, funcName);
|
||||
|
||||
|
@ -142,34 +122,57 @@ int taosLoadScriptNormal(char *pInput, int16_t iType, int16_t iBytes, int32_t nu
|
|||
lua_rawseti(lua, -2, i+1);
|
||||
offset += iBytes;
|
||||
}
|
||||
|
||||
lua_pushnumber(lua, pCtx->initValue.d);
|
||||
int isGlobalState = false;
|
||||
lua_getglobal(lua, "global");
|
||||
if (lua_istable(lua, -1)) {
|
||||
isGlobalState = true;
|
||||
}
|
||||
// do call lua script
|
||||
if (lua_pcall(lua, 2, 1, 0) != 0) {
|
||||
qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
|
||||
lua_pop(lua, -1);
|
||||
return -1;
|
||||
return;
|
||||
}
|
||||
|
||||
int tNumOfOutput = 0;
|
||||
luaValueToTaosType(lua, iType, pOutput, oType, &tNumOfOutput, oBytes);
|
||||
pCtx->numOfOutput += tNumOfOutput;
|
||||
*numOfOutput = pCtx->numOfOutput;
|
||||
return 0;
|
||||
}
|
||||
int taosLoadScriptFinalize(char *pOutput, int32_t output, SUdfInit *pInit) {
|
||||
//do not support agg now
|
||||
return 0;
|
||||
}
|
||||
int taosLoadScriptDestroy(SUdfInit* pInit) {
|
||||
pInit->destroyCtxFunc(pInit->script_ctx);
|
||||
return 0;
|
||||
if (isGlobalState == false) {
|
||||
luaValueToTaosType(lua, pOutput, &tNumOfOutput, oType, oBytes);
|
||||
} else {
|
||||
lua_setglobal(lua, "global");
|
||||
}
|
||||
*numOfOutput = tNumOfOutput;
|
||||
}
|
||||
|
||||
ScriptCtx* createScriptCtx(char *script) {
|
||||
//do not support agg now
|
||||
void taosLoadScriptFinalize(void *pInit, char *pOutput, int32_t* numOfOutput) {
|
||||
ScriptCtx *pCtx = pInit;
|
||||
char funcName[MAX_FUNC_NAME] = {0};
|
||||
sprintf(funcName, "%s_finalize", pCtx->funcName);
|
||||
|
||||
lua_State* lua = pCtx->pEnv->lua_state;
|
||||
lua_getglobal(lua, funcName);
|
||||
|
||||
lua_getglobal(lua, "global");
|
||||
|
||||
if (lua_pcall(lua, 1, 1, 0) != 0) {
|
||||
qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
|
||||
lua_pop(lua, -1);
|
||||
return;
|
||||
}
|
||||
int tNumOfOutput = 0;
|
||||
luaValueToTaosType(lua, pOutput, &tNumOfOutput, pCtx->resType, pCtx->resBytes);
|
||||
*numOfOutput = tNumOfOutput;
|
||||
}
|
||||
|
||||
void taosLoadScriptDestroy(void *pInit) {
|
||||
destroyScriptCtx(pInit);
|
||||
}
|
||||
|
||||
ScriptCtx* createScriptCtx(char *script, int8_t resType, int16_t resBytes) {
|
||||
ScriptCtx *pCtx = (ScriptCtx *)calloc(1, sizeof(ScriptCtx));
|
||||
pCtx->state = SCRIPT_STATE_INIT;
|
||||
pCtx->pEnv = getScriptEnvFromPool(); //
|
||||
pCtx->pEnv = getScriptEnvFromPool(); //
|
||||
pCtx->resType = resType;
|
||||
pCtx->resBytes = resBytes;
|
||||
|
||||
if (pCtx->pEnv == NULL) {
|
||||
destroyScriptCtx(pCtx);
|
||||
|
@ -202,104 +205,31 @@ void destroyScriptCtx(void *pCtx) {
|
|||
free(pCtx);
|
||||
}
|
||||
|
||||
//void XXXX_init(ScriptCtx *pCtx) {
|
||||
//}
|
||||
//
|
||||
//void XXXX_add(ScriptCtx *pCtx, char *input, int16_t iType, int16_t iBytes, int32_t numOfRows,
|
||||
// char *dataOutput, int16_t oType, int32_t *numOfOutput, int16_t oBytes) {
|
||||
// char funcName[MAX_FUNC_NAME] = {0};
|
||||
// sprintf(funcName, "%s_add", pCtx->funcName);
|
||||
//
|
||||
// lua_State* lua = pCtx->pEnv->lua_state;
|
||||
// lua_getglobal(lua, funcName);
|
||||
//
|
||||
// // set first param of XXXX_add;
|
||||
// lua_newtable(lua);
|
||||
// int32_t offset = 0;
|
||||
// for (int32_t i = 0; i < numOfRows; i++) {
|
||||
// taosValueToLuaType(lua, iType, input + offset);
|
||||
// lua_rawseti(lua, -2, i+1);
|
||||
// offset += iBytes;
|
||||
// }
|
||||
//
|
||||
// // set second param of XXXX_add;
|
||||
// lua_pushnumber(lua, pCtx->initValue.d);
|
||||
// // do call lua script
|
||||
// if (lua_pcall(lua, 2, 1, 0) != 0) {
|
||||
// qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
|
||||
// lua_pop(lua, -1);
|
||||
// return;
|
||||
// }
|
||||
// int tNumOfOutput = 0;
|
||||
// luaValueToTaosType(lua, iType, dataOutput, oType, &tNumOfOutput, oBytes);
|
||||
// pCtx->numOfOutput += tNumOfOutput;
|
||||
// *numOfOutput = pCtx->numOfOutput;
|
||||
//}
|
||||
//void XXXX_merge(ScriptCtx *pCtx) {
|
||||
// char funcName[MAX_FUNC_NAME] = {0};
|
||||
// sprintf(funcName, "%s_merge", pCtx->funcName);
|
||||
//
|
||||
// lua_State* lua = pCtx->pEnv->lua_state;
|
||||
// lua_getglobal(lua, funcName);
|
||||
//
|
||||
// // set first param of XXXX_merge;
|
||||
// //lua_newtable(lua);
|
||||
// //int32_t offset = 0;
|
||||
// //for (int32_t i = 0; i < numOfRows; i++) {
|
||||
// // taosValueToLuaType(lua, iType, input + offset);
|
||||
// // lua_rawseti(lua, -2, i+1);
|
||||
// // offset += iBytes;
|
||||
// //}
|
||||
//
|
||||
// // set second param of XXXX_merge;
|
||||
// //lua_newtable(lua);
|
||||
// //offset = 0;
|
||||
// //for (int32_t i = 0; i < numOfRows; i++) {
|
||||
// // taosValueToLuaType(lua, iType, input + offset);
|
||||
// // lua_rawseti(lua, -2, i+1);
|
||||
// // offset += iBytes;
|
||||
// //}
|
||||
// // push two table
|
||||
// //if (lua_pcall(lua, 2, 1, 0) != 0) {
|
||||
// // qError("SCRIPT ERROR: %s", lua_tostring(lua, -1));
|
||||
// // lua_pop(lua, -1);
|
||||
// // return;
|
||||
// //}
|
||||
// //int tNumOfOutput = 0;
|
||||
// //luaValueToTaosType(lua, iType, dataOutput, oType, &tNumOfOutput, oBytes);
|
||||
// //pCtx->numOfOutput += tNumOfOutput;
|
||||
// //*numOfOutput = pCtx->numOfOutput;
|
||||
//}
|
||||
//
|
||||
//void XXXX_finalize(ScriptCtx *pCtx) {
|
||||
// char funcName[MAX_FUNC_NAME] = {0};
|
||||
// sprintf(funcName, "%s_finalize", pCtx->funcName);
|
||||
//
|
||||
// lua_State* lua = pCtx->pEnv->lua_state;
|
||||
// lua_getglobal(lua, funcName);
|
||||
// // push two paramter
|
||||
// if (lua_pcall(lua, 2, 1, 0) != 0) {
|
||||
// }
|
||||
//}
|
||||
|
||||
|
||||
void luaValueToTaosType(lua_State *lua, int16_t iType, char *interBuf, int16_t oType, int32_t *numOfOutput, int16_t oBytes) {
|
||||
void luaValueToTaosType(lua_State *lua, char *interBuf, int32_t *numOfOutput, int16_t oType, int16_t oBytes) {
|
||||
int t = lua_type(lua,-1);
|
||||
int32_t sz = 0;
|
||||
switch (t) {
|
||||
case LUA_TSTRING:
|
||||
//char *result = lua_tostring(lua, -1);
|
||||
sz = 1;
|
||||
// agg
|
||||
//TODO(yihaodeng): handle str type
|
||||
{
|
||||
const char *v = lua_tostring(lua, -1);
|
||||
memcpy(interBuf, v, strlen(v));
|
||||
sz = 1;
|
||||
}
|
||||
break;
|
||||
case LUA_TBOOLEAN:
|
||||
//int64_t result = lua_tonumber(lua, -1);
|
||||
sz = 1;
|
||||
// agg
|
||||
{
|
||||
double v = lua_tonumber(lua, -1);
|
||||
memcpy(interBuf, (char *)&v, oBytes);
|
||||
sz = 1;
|
||||
}
|
||||
break;
|
||||
case LUA_TNUMBER:
|
||||
//int64_t result = lua_tonumber(lua, -1);
|
||||
sz = 1;
|
||||
{
|
||||
float v = lua_tonumber(lua, -1);
|
||||
memcpy(interBuf, (char *)&v, oBytes);
|
||||
sz = 1;
|
||||
}
|
||||
break;
|
||||
case LUA_TTABLE:
|
||||
{
|
||||
|
@ -321,27 +251,6 @@ void luaValueToTaosType(lua_State *lua, int16_t iType, char *interBuf, int16_t o
|
|||
*numOfOutput = sz;
|
||||
}
|
||||
|
||||
//void execUdf(ScriptCtx *pCtx, char *input, int16_t iType, int16_t iBytes, int32_t numOfInput,
|
||||
// int64_t* ts, char* dataOutput, char *tsOutput, int32_t* numOfOutput, char *interBuf, int16_t oType, int16_t oBytes) {
|
||||
// int8_t state = pCtx->state;
|
||||
// switch (state) {
|
||||
// case SCRIPT_STATE_INIT:
|
||||
// XXXX_init(pCtx);
|
||||
// XXXX_add(pCtx, input, iType, iBytes, numOfInput, dataOutput, oType, numOfOutput, oBytes);
|
||||
// break;
|
||||
// case SCRIPT_STATE_ADD:
|
||||
// XXXX_add(pCtx, input, iType, iBytes, numOfInput, dataOutput, oType, numOfOutput, oBytes);
|
||||
// break;
|
||||
// case SCRIPT_STATE_MERGE:
|
||||
// XXXX_merge(pCtx);
|
||||
// break;
|
||||
// case SCRIPT_STATE_FINALIZE:
|
||||
// XXXX_finalize(pCtx);
|
||||
// break;
|
||||
// default:
|
||||
// return;
|
||||
// }
|
||||
//}
|
||||
|
||||
/*
|
||||
*Initialize the scripting environment.
|
||||
|
@ -438,6 +347,7 @@ void addScriptEnvToPool(ScriptEnv *pEnv) {
|
|||
return;
|
||||
}
|
||||
pthread_mutex_lock(&pool->mutex);
|
||||
lua_settop(pEnv->lua_state, 0);
|
||||
tdListAppend(pool->scriptEnvs, (void *)(&pEnv));
|
||||
pool->cSize++;
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
|
@ -451,23 +361,27 @@ bool hasBaseFuncDefinedInScript(lua_State *lua, const char *funcPrefix, int32_t
|
|||
const char *base[] = {"_init", "_add"};
|
||||
for (int i = 0; (i < sizeof(base)/sizeof(base[0])) && (ret == true); i++) {
|
||||
memcpy(funcName + len, base[i], strlen(base[i]));
|
||||
funcName[len + strlen(base[i])] = 0;
|
||||
|
||||
memset(funcName + len + strlen(base[i]), 0, MAX_FUNC_NAME - len - strlen(base[i]));
|
||||
lua_getglobal(lua, funcName);
|
||||
ret = lua_isfunction(lua, -1); // exsit function or not
|
||||
lua_pop(lua, 1);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
bool isValidScript(const char *script) {
|
||||
|
||||
bool isValidScript(char *script, int32_t len) {
|
||||
ScriptEnv *pEnv = getScriptEnvFromPool(); //
|
||||
if (pEnv == NULL) {
|
||||
return false;
|
||||
}
|
||||
lua_State *lua = pEnv->lua_state;
|
||||
if (len < strlen(script)) {
|
||||
script[len] = 0;
|
||||
}
|
||||
if (luaL_dostring(lua, script)) {
|
||||
lua_pop(lua, 1);
|
||||
addScriptEnvToPool(pEnv);
|
||||
qError("error at %s and %d", script, (int)(strlen(script)));
|
||||
return false;
|
||||
}
|
||||
lua_getglobal(lua, USER_FUNC_NAME);
|
||||
|
@ -475,6 +389,7 @@ bool isValidScript(const char *script) {
|
|||
if (name == NULL || strlen(name) >= USER_FUNC_NAME_LIMIT) {
|
||||
lua_pop(lua, 1);
|
||||
addScriptEnvToPool(pEnv);
|
||||
qError("error at %s name: %s, len = %d", script, name, (int)(strlen(name)));
|
||||
return false;
|
||||
}
|
||||
bool ret = hasBaseFuncDefinedInScript(lua, name, strlen(name));
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
funcName = "test"
|
||||
|
||||
global = {}
|
||||
global["sum"] = 0.0
|
||||
global["num"] = 0
|
||||
|
||||
function test_init()
|
||||
return global
|
||||
end
|
||||
|
||||
function test_add(rows, ans)
|
||||
for i=1, #rows do
|
||||
ans["sum"] = ans["sum"] + rows[i] * rows[i]
|
||||
end
|
||||
ans["num"] = ans["num"] + #rows
|
||||
return ans;
|
||||
end
|
||||
|
||||
function test_finalize(ans)
|
||||
return ans["sum"]/ans["num"];
|
||||
end
|
Loading…
Reference in New Issue