diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 1b1339340b..65d3a98654 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -259,6 +259,39 @@ typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interB typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf); typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData); +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TODO: capacity for path and +// define macro for name and path len or use dynamic allocation/shared with SUdf. + +typedef struct SUdfInfo { + char name[65]; + + int8_t funcType; + int8_t scriptType; + int8_t outputType; + int32_t outputLen; + int32_t bufSize; + + char path[512]; +} SUdfInfo; + +// TODO: deprecate SUdfInterBuf.numOfResult or add isInitial to SUdfInterBuf + +typedef int32_t (*TScriptUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx); + +typedef int32_t (*TScriptUdfAggStartFunc)(SUdfInterBuf *buf, void *udfCtx); +typedef int32_t (*TScriptUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, + void *udfCtx); +typedef int32_t (*TScriptUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, + void *udfCtx); +typedef int32_t (*TScriptUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx); +typedef int32_t (*TScriptUdfInitFunc)(SUdfInfo *info, void **pUdfCtx); +typedef int32_t (*TScriptUdfDestoryFunc)(void *udfCtx); + +// the following function is for open/close script plugin. +typedef int32_t (*TScriptOpenFunc)(void *scriptCtx); +typedef int32_t (*TScriptCloseFunc)(void *scriptCtx); + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 52d8a75ee0..7bcf3201da 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -695,6 +695,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908) #define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2909) #define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x290A) +#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B) // sml #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) diff --git a/include/util/tdef.h b/include/util/tdef.h index 9036befc02..32791c9282 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -205,7 +205,7 @@ typedef enum ELogicConditionType { #define TSDB_FUNC_TYPE_SCALAR 1 #define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_SCRIPT_BIN_LIB 0 -#define TSDB_FUNC_SCRIPT_LUA 1 +#define TSDB_FUNC_SCRIPT_PYTHON 1 #define TSDB_FUNC_MAX_RETRIEVE 1024 #define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0' diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index fe6ed7d785..f50ce71b50 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -30,6 +30,153 @@ #include "tmisce.h" // clang-format on +#define MAX_NUM_SCRIPT_PLUGINS 64 +#define MAX_NUM_PLUGIN_FUNCS 9 + + +typedef struct SUdfCPluginCtx { + uv_lib_t lib; + + TUdfScalarProcFunc scalarProcFunc; + + TUdfAggStartFunc aggStartFunc; + TUdfAggProcessFunc aggProcFunc; + TUdfAggFinishFunc aggFinishFunc; + TUdfAggMergeFunc aggMergeFunc; + + TUdfInitFunc initFunc; + TUdfDestroyFunc destroyFunc; +} SUdfCPluginCtx; + +int32_t udfdCPluginOpen(void *scriptCtx) { return 0; } + +int32_t udfdCPluginClose(void *scriptCtx) { return 0; } + +int32_t udfdCPluginUdfInit(SUdfInfo *udf, void **pUdfCtx) { + int32_t err = 0; + SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); + err = uv_dlopen(udf->path, &udfCtx->lib); + if (err != 0) { + fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + const char *udfName = udf->name; + char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *initSuffix = "_init"; + strcpy(initFuncName, udfName); + strncat(initFuncName, initSuffix, strlen(initSuffix)); + uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); + + char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *destroySuffix = "_destroy"; + strcpy(destroyFuncName, udfName); + strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); + uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); + + if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); + } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); + char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *startSuffix = "_start"; + strncpy(startFuncName, processFuncName, sizeof(startFuncName)); + strncat(startFuncName, startSuffix, strlen(startSuffix)); + uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); + char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; + char *finishSuffix = "_finish"; + strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); + strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); + uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *mergeSuffix = "_merge"; + strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); + strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); + uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); + } + if (udfCtx->initFunc) { + (udfCtx->initFunc)(); + } + *pUdfCtx = udfCtx; + return 0; +} + +int32_t udfdCPluginUdfDestroy(void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->destroyFunc) { + (ctx->destroyFunc)(); + } + uv_dlclose(&ctx->lib); + taosMemoryFree(ctx); + return 0; +} + +int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->scalarProcFunc) { + ctx->scalarProcFunc(block, resultCol); + } + return 0; +} + +int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggStartFunc) { + ctx->aggStartFunc(buf); + } + return 0; +} + +int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggProcFunc) { + ctx->aggProcFunc(block, interBuf, newInterBuf); + } + return 0; +} + +int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, + void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggMergeFunc) { + ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); + } + return 0; +} + +int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggFinishFunc) { + ctx->aggFinishFunc(buf, resultData); + } + return 0; +} + +// for c, the function pointer are filled directly and libloaded = true; +// for others, dlopen/dlsym to find function pointers +typedef struct SUdfScriptPlugin { + int8_t scriptType; + + char libPath[PATH_MAX]; + bool libLoaded; + uv_lib_t lib; + + TScriptUdfScalarProcFunc udfScalarProcFunc; + TScriptUdfAggStartFunc udfAggStartFunc; + TScriptUdfAggProcessFunc udfAggProcFunc; + TScriptUdfAggMergeFunc udfAggMergeFunc; + TScriptUdfAggFinishFunc udfAggFinishFunc; + + TScriptUdfInitFunc udfInitFunc; + TScriptUdfDestoryFunc udfDestroyFunc; + + TScriptOpenFunc openFunc; + TScriptCloseFunc closeFunc; +} SUdfScriptPlugin; + typedef struct SUdfdContext { uv_loop_t *loop; uv_pipe_t ctrlPipe; @@ -37,11 +184,15 @@ typedef struct SUdfdContext { char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_pipe_t listeningPipe; - void *clientRpc; - SCorEpSet mgmtEp; + void *clientRpc; + SCorEpSet mgmtEp; + uv_mutex_t udfsMutex; SHashObj *udfsHash; + uv_mutex_t scriptPluginsMutex; + SUdfScriptPlugin *scriptPlugins[MAX_NUM_SCRIPT_PLUGINS]; + SArray *residentFuncs; bool printVersion; @@ -73,13 +224,8 @@ typedef struct SUvUdfWork { typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; typedef struct SUdf { - int32_t refCount; - EUdfState state; - uv_mutex_t lock; - uv_cond_t condReady; - bool resident; + char name[TSDB_FUNC_NAME_LEN + 1]; - char name[TSDB_FUNC_NAME_LEN + 1]; int8_t funcType; int8_t scriptType; int8_t outputType; @@ -88,17 +234,14 @@ typedef struct SUdf { char path[PATH_MAX]; - uv_lib_t lib; + int32_t refCount; + EUdfState state; + uv_mutex_t lock; + uv_cond_t condReady; + bool resident; - TUdfScalarProcFunc scalarProcFunc; - - TUdfAggStartFunc aggStartFunc; - TUdfAggProcessFunc aggProcFunc; - TUdfAggFinishFunc aggFinishFunc; - TUdfAggMergeFunc aggMergeFunc; - - TUdfInitFunc initFunc; - TUdfDestroyFunc destroyFunc; + SUdfScriptPlugin *scriptPlugin; + void *scriptUdfCtx; } SUdf; // TODO: add private udf structure. @@ -121,7 +264,6 @@ typedef struct SUdfdRpcSendRecvInfo { static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); static int32_t udfdConnectToMnode(); -static int32_t udfdLoadUdf(char *udfName, SUdf *udf); static bool udfdRpcRfp(int32_t code, tmsg_t msgType); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); @@ -155,6 +297,71 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); +void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { + plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; + plugin->openFunc = udfdCPluginOpen; + plugin->closeFunc = udfdCPluginClose; + plugin->udfInitFunc = udfdCPluginUdfInit; + plugin->udfDestroyFunc = udfdCPluginUdfDestroy; + plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc; + plugin->udfAggStartFunc = udfdCPluginUdfAggStart; + plugin->udfAggProcFunc = udfdCPluginUdfAggProc; + plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; + plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; + return; +} + +int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) { + int err = uv_dlopen(libPath, pLib); + if (err != 0) { + fnError("can not load library %s. error: %s", libPath, uv_strerror(err)); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + + for (int i = 0; i < numOfFuncs; ++i) { + err = uv_dlsym(pLib, funcName[i], func[i]); + if (err != 0) { + fnError("load library function failed. lib %s function %s", libPath, funcName[i]); + } + } + return 0; +} + +void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { + plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; + sprintf("%s", plugin->libPath, "libtaosudf_py.so"); + plugin->libLoaded = false; + const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"open", "close", "udfInit", + "udfDestroy", "udfScalarProc", "udfAggStart", + "udfAggFinish", "udfAggProc", "udfAggMerge"}; + void **funcs[MAX_NUM_PLUGIN_FUNCS] = { + (void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc, + (void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc, + (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc, (void **)&plugin->udfAggMergeFunc}; + int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, MAX_NUM_PLUGIN_FUNCS); + if (err != 0) { + fnError("can not load python plugin. lib path %s", plugin->libPath); + return; + } + plugin->libLoaded = true; + return; +} + +void udfdInitScriptPlugins() { + SUdfScriptPlugin *plugins = taosMemoryCalloc(2, sizeof(SUdfScriptPlugin)); + // Initialize c language plugin + udfdInitializeCPlugin(plugins + 0); + // Initialize python plugin + udfdInitializePythonPlugin(plugins + 1); + return; +} + +void udfdDeinitScriptPlugins() { + + return; +} + + void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; @@ -180,14 +387,43 @@ void udfdProcessRequest(uv_work_t *req) { } } -void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { - // TODO: tracable id from client. connect, setup, call, teardown - fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); - SUdfSetupRequest *setup = &request->setup; - int32_t code = TSDB_CODE_SUCCESS; - SUdf *udf = NULL; +void convertUdf2UdfInfo(SUdf *udf, SUdfInfo *udfInfo) { + udfInfo->bufSize = udf->bufSize; + udfInfo->funcType = udf->funcType; + strncpy(udfInfo->name, udf->name, strlen(udf->name)); + udfInfo->outputLen = udf->outputLen; + udfInfo->outputType = udf->outputType; + strncpy(udfInfo->path, udf->path, strlen(udf->path)); + udfInfo->scriptType = udf->scriptType; +} + +int32_t udfdInitUdf(char *udfName, SUdf *udf) { + int32_t err = 0; + err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); + if (err != 0) { + fnError("can not retrieve udf from mnode. udf name %s", udfName); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + //TODO: remove script plugins mutex + uv_mutex_lock(&global.scriptPluginsMutex); + SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; + if (scriptPlugin == NULL) { + fnError("udf name %s script type %d not supported", udfName, udf->scriptType); + uv_mutex_unlock(&global.scriptPluginsMutex); + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } + uv_mutex_unlock(&global.scriptPluginsMutex); + udf->scriptPlugin = scriptPlugin; + SUdfInfo info = {0}; + convertUdf2UdfInfo(udf, &info); + udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); + return 0; +} + +SUdf *udfdGetOrCreateUdf(const char *udfName) { + SUdf *udf = NULL; uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); + SUdf **udfInHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); if (udfInHash) { ++(*udfInHash)->refCount; udf = *udfInHash; @@ -195,23 +431,35 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } else { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; + strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); + udfNew->state = UDF_STATE_INIT; uv_mutex_init(&udfNew->lock); uv_cond_init(&udfNew->condReady); udf = udfNew; SUdf **pUdf = &udf; - taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES); + taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex); } + return udf; +} + +void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + // TODO: tracable id from client. connect, setup, call, teardown + fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); + + SUdfSetupRequest *setup = &request->setup; + int32_t code = TSDB_CODE_SUCCESS; + SUdf *udf = NULL; + + udf = udfdGetOrCreateUdf(setup->udfName); uv_mutex_lock(&udf->lock); if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; - code = udfdLoadUdf(setup->udfName, udf); - if (udf->initFunc) { - udf->initFunc(); - } + code = udfdInitUdf(setup->udfName, udf); + udf->resident = false; for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { char *funcName = taosArrayGet(global.residentFuncs, i); @@ -270,7 +518,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); - code = udf->scalarProcFunc(&input, &output); + code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); freeUdfDataDataBlock(&input); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); freeUdfColumn(&output); @@ -278,7 +526,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } case TSDB_UDF_CALL_AGG_INIT: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - udf->aggStartFunc(&outBuf); + code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); subRsp->resultBuf = outBuf; break; } @@ -286,7 +534,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); + code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); freeUdfDataDataBlock(&input); subRsp->resultBuf = outBuf; @@ -295,7 +543,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } case TSDB_UDF_CALL_AGG_MERGE: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf); + code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); freeUdfInterBuf(&call->interBuf2); subRsp->resultBuf = outBuf; @@ -304,7 +552,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggFinishFunc(&call->interBuf, &outBuf); + code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); subRsp->resultBuf = outBuf; break; @@ -374,11 +622,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { if (unloadUdf) { uv_cond_destroy(&udf->condReady); uv_mutex_destroy(&udf->lock); - if (udf->destroyFunc) { - (udf->destroyFunc)(); - } - uv_dlclose(&udf->lib); - taosMemoryFree(udf); + udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); } taosMemoryFree(handle); @@ -440,7 +684,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { goto _return; } SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf *udf = msgInfo->param; + // SUdf *udf = msgInfo->param; + SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; @@ -455,12 +700,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { } char path[PATH_MAX] = {0}; -#ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name); -#else - snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name); -#endif - + #ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); + #else + snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); + #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); @@ -550,65 +794,10 @@ int32_t udfdConnectToMnode() { return code; } -int32_t udfdLoadUdf(char *udfName, SUdf *udf) { - strncpy(udf->name, udfName, TSDB_FUNC_NAME_LEN); - int32_t err = 0; - - err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf); - if (err != 0) { - fnError("can not retrieve udf from mnode. udf name %s", udfName); - return TSDB_CODE_UDF_LOAD_UDF_FAILURE; - } - - err = uv_dlopen(udf->path, &udf->lib); - if (err != 0) { - fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); - return TSDB_CODE_UDF_LOAD_UDF_FAILURE; - } - - char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *initSuffix = "_init"; - strcpy(initFuncName, udfName); - strncat(initFuncName, initSuffix, strlen(initSuffix)); - uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc)); - - char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *destroySuffix = "_destroy"; - strcpy(destroyFuncName, udfName); - strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); - uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc)); - - if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { - char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(processFuncName, udfName); - uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); - } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { - char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(processFuncName, udfName); - uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc)); - char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *startSuffix = "_start"; - strncpy(startFuncName, processFuncName, sizeof(startFuncName)); - strncat(startFuncName, startSuffix, strlen(startSuffix)); - uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc)); - char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; - char *finishSuffix = "_finish"; - strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); - strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); - uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); - char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *mergeSuffix = "_merge"; - strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); - strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); - uv_dlsym(&udf->lib, mergeFuncName, (void **)(&udf->aggMergeFunc)); - } - return 0; -} static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || - code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || - code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || - code == TSDB_CODE_APP_IS_STOPPING) { + code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING || + code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; @@ -765,7 +954,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { } void udfdHandleRequest(SUdfdUvConn *conn) { - char *inputBuf = conn->inputBuf; + char *inputBuf = conn->inputBuf; int32_t inputLen = conn->inputLen; uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); @@ -784,7 +973,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) { void udfdPipeCloseCb(uv_handle_t *pipe) { SUdfdUvConn *conn = pipe->data; - SUvUdfWork* pWork = conn->pWorkList; + SUvUdfWork *pWork = conn->pWorkList; while (pWork != NULL) { pWork->conn = NULL; pWork = pWork->pWorkNext; @@ -960,6 +1149,8 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) { } static int32_t udfdRun() { + uv_mutex_init(&global.scriptPluginsMutex); + global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); @@ -1017,11 +1208,7 @@ int32_t udfdDeinitResidentFuncs() { SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); if (udfInHash) { SUdf *udf = *udfInHash; - if (udf->destroyFunc) { - (udf->destroyFunc)(); - } - uv_dlclose(&udf->lib); - taosMemoryFree(udf); + udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); taosHashRemove(global.udfsHash, funcName, strlen(funcName)); } } @@ -1072,6 +1259,8 @@ int main(int argc, char *argv[]) { return -5; } + udfdInitScriptPlugins(); + udfdInitResidentFuncs(); uv_thread_t mnodeConnectThread; @@ -1083,6 +1272,9 @@ int main(int argc, char *argv[]) { udfdCloseClientRpc(); udfdDeinitResidentFuncs(); + + udfdDeinitScriptPlugins(); + udfdCleanup(); return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index bab3edc870..6621192291 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -573,6 +573,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid functio TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")