From 52264734057601bb8347359cffbea623dcd3fdc2 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 6 Feb 2023 18:33:54 +0800 Subject: [PATCH 01/33] feature: udf dispatch first by script type then by udf name --- include/libs/function/taosudf.h | 33 +++ include/util/taoserror.h | 1 + include/util/tdef.h | 2 +- source/libs/function/src/udfd.c | 416 +++++++++++++++++++++++--------- source/util/src/terror.c | 1 + 5 files changed, 340 insertions(+), 113 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 1b1339340b..65d3a98654 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -259,6 +259,39 @@ typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interB typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf); typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData); +////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TODO: capacity for path and +// define macro for name and path len or use dynamic allocation/shared with SUdf. + +typedef struct SUdfInfo { + char name[65]; + + int8_t funcType; + int8_t scriptType; + int8_t outputType; + int32_t outputLen; + int32_t bufSize; + + char path[512]; +} SUdfInfo; + +// TODO: deprecate SUdfInterBuf.numOfResult or add isInitial to SUdfInterBuf + +typedef int32_t (*TScriptUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx); + +typedef int32_t (*TScriptUdfAggStartFunc)(SUdfInterBuf *buf, void *udfCtx); +typedef int32_t (*TScriptUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, + void *udfCtx); +typedef int32_t (*TScriptUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, + void *udfCtx); +typedef int32_t (*TScriptUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx); +typedef int32_t (*TScriptUdfInitFunc)(SUdfInfo *info, void **pUdfCtx); +typedef int32_t (*TScriptUdfDestoryFunc)(void *udfCtx); + +// the following function is for open/close script plugin. +typedef int32_t (*TScriptOpenFunc)(void *scriptCtx); +typedef int32_t (*TScriptCloseFunc)(void *scriptCtx); + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 52d8a75ee0..7bcf3201da 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -695,6 +695,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908) #define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2909) #define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x290A) +#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B) // sml #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) diff --git a/include/util/tdef.h b/include/util/tdef.h index 9036befc02..32791c9282 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -205,7 +205,7 @@ typedef enum ELogicConditionType { #define TSDB_FUNC_TYPE_SCALAR 1 #define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_SCRIPT_BIN_LIB 0 -#define TSDB_FUNC_SCRIPT_LUA 1 +#define TSDB_FUNC_SCRIPT_PYTHON 1 #define TSDB_FUNC_MAX_RETRIEVE 1024 #define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0' diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index fe6ed7d785..f50ce71b50 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -30,6 +30,153 @@ #include "tmisce.h" // clang-format on +#define MAX_NUM_SCRIPT_PLUGINS 64 +#define MAX_NUM_PLUGIN_FUNCS 9 + + +typedef struct SUdfCPluginCtx { + uv_lib_t lib; + + TUdfScalarProcFunc scalarProcFunc; + + TUdfAggStartFunc aggStartFunc; + TUdfAggProcessFunc aggProcFunc; + TUdfAggFinishFunc aggFinishFunc; + TUdfAggMergeFunc aggMergeFunc; + + TUdfInitFunc initFunc; + TUdfDestroyFunc destroyFunc; +} SUdfCPluginCtx; + +int32_t udfdCPluginOpen(void *scriptCtx) { return 0; } + +int32_t udfdCPluginClose(void *scriptCtx) { return 0; } + +int32_t udfdCPluginUdfInit(SUdfInfo *udf, void **pUdfCtx) { + int32_t err = 0; + SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); + err = uv_dlopen(udf->path, &udfCtx->lib); + if (err != 0) { + fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + const char *udfName = udf->name; + char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *initSuffix = "_init"; + strcpy(initFuncName, udfName); + strncat(initFuncName, initSuffix, strlen(initSuffix)); + uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); + + char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *destroySuffix = "_destroy"; + strcpy(destroyFuncName, udfName); + strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); + uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); + + if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); + } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); + char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *startSuffix = "_start"; + strncpy(startFuncName, processFuncName, sizeof(startFuncName)); + strncat(startFuncName, startSuffix, strlen(startSuffix)); + uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); + char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; + char *finishSuffix = "_finish"; + strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); + strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); + uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *mergeSuffix = "_merge"; + strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); + strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); + uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); + } + if (udfCtx->initFunc) { + (udfCtx->initFunc)(); + } + *pUdfCtx = udfCtx; + return 0; +} + +int32_t udfdCPluginUdfDestroy(void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->destroyFunc) { + (ctx->destroyFunc)(); + } + uv_dlclose(&ctx->lib); + taosMemoryFree(ctx); + return 0; +} + +int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->scalarProcFunc) { + ctx->scalarProcFunc(block, resultCol); + } + return 0; +} + +int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggStartFunc) { + ctx->aggStartFunc(buf); + } + return 0; +} + +int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggProcFunc) { + ctx->aggProcFunc(block, interBuf, newInterBuf); + } + return 0; +} + +int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, + void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggMergeFunc) { + ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); + } + return 0; +} + +int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { + SUdfCPluginCtx *ctx = udfCtx; + if (ctx->aggFinishFunc) { + ctx->aggFinishFunc(buf, resultData); + } + return 0; +} + +// for c, the function pointer are filled directly and libloaded = true; +// for others, dlopen/dlsym to find function pointers +typedef struct SUdfScriptPlugin { + int8_t scriptType; + + char libPath[PATH_MAX]; + bool libLoaded; + uv_lib_t lib; + + TScriptUdfScalarProcFunc udfScalarProcFunc; + TScriptUdfAggStartFunc udfAggStartFunc; + TScriptUdfAggProcessFunc udfAggProcFunc; + TScriptUdfAggMergeFunc udfAggMergeFunc; + TScriptUdfAggFinishFunc udfAggFinishFunc; + + TScriptUdfInitFunc udfInitFunc; + TScriptUdfDestoryFunc udfDestroyFunc; + + TScriptOpenFunc openFunc; + TScriptCloseFunc closeFunc; +} SUdfScriptPlugin; + typedef struct SUdfdContext { uv_loop_t *loop; uv_pipe_t ctrlPipe; @@ -37,11 +184,15 @@ typedef struct SUdfdContext { char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_pipe_t listeningPipe; - void *clientRpc; - SCorEpSet mgmtEp; + void *clientRpc; + SCorEpSet mgmtEp; + uv_mutex_t udfsMutex; SHashObj *udfsHash; + uv_mutex_t scriptPluginsMutex; + SUdfScriptPlugin *scriptPlugins[MAX_NUM_SCRIPT_PLUGINS]; + SArray *residentFuncs; bool printVersion; @@ -73,13 +224,8 @@ typedef struct SUvUdfWork { typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; typedef struct SUdf { - int32_t refCount; - EUdfState state; - uv_mutex_t lock; - uv_cond_t condReady; - bool resident; + char name[TSDB_FUNC_NAME_LEN + 1]; - char name[TSDB_FUNC_NAME_LEN + 1]; int8_t funcType; int8_t scriptType; int8_t outputType; @@ -88,17 +234,14 @@ typedef struct SUdf { char path[PATH_MAX]; - uv_lib_t lib; + int32_t refCount; + EUdfState state; + uv_mutex_t lock; + uv_cond_t condReady; + bool resident; - TUdfScalarProcFunc scalarProcFunc; - - TUdfAggStartFunc aggStartFunc; - TUdfAggProcessFunc aggProcFunc; - TUdfAggFinishFunc aggFinishFunc; - TUdfAggMergeFunc aggMergeFunc; - - TUdfInitFunc initFunc; - TUdfDestroyFunc destroyFunc; + SUdfScriptPlugin *scriptPlugin; + void *scriptUdfCtx; } SUdf; // TODO: add private udf structure. @@ -121,7 +264,6 @@ typedef struct SUdfdRpcSendRecvInfo { static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); static int32_t udfdConnectToMnode(); -static int32_t udfdLoadUdf(char *udfName, SUdf *udf); static bool udfdRpcRfp(int32_t code, tmsg_t msgType); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); @@ -155,6 +297,71 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); +void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { + plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; + plugin->openFunc = udfdCPluginOpen; + plugin->closeFunc = udfdCPluginClose; + plugin->udfInitFunc = udfdCPluginUdfInit; + plugin->udfDestroyFunc = udfdCPluginUdfDestroy; + plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc; + plugin->udfAggStartFunc = udfdCPluginUdfAggStart; + plugin->udfAggProcFunc = udfdCPluginUdfAggProc; + plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; + plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; + return; +} + +int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) { + int err = uv_dlopen(libPath, pLib); + if (err != 0) { + fnError("can not load library %s. error: %s", libPath, uv_strerror(err)); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + + for (int i = 0; i < numOfFuncs; ++i) { + err = uv_dlsym(pLib, funcName[i], func[i]); + if (err != 0) { + fnError("load library function failed. lib %s function %s", libPath, funcName[i]); + } + } + return 0; +} + +void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { + plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; + sprintf("%s", plugin->libPath, "libtaosudf_py.so"); + plugin->libLoaded = false; + const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"open", "close", "udfInit", + "udfDestroy", "udfScalarProc", "udfAggStart", + "udfAggFinish", "udfAggProc", "udfAggMerge"}; + void **funcs[MAX_NUM_PLUGIN_FUNCS] = { + (void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc, + (void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc, + (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc, (void **)&plugin->udfAggMergeFunc}; + int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, MAX_NUM_PLUGIN_FUNCS); + if (err != 0) { + fnError("can not load python plugin. lib path %s", plugin->libPath); + return; + } + plugin->libLoaded = true; + return; +} + +void udfdInitScriptPlugins() { + SUdfScriptPlugin *plugins = taosMemoryCalloc(2, sizeof(SUdfScriptPlugin)); + // Initialize c language plugin + udfdInitializeCPlugin(plugins + 0); + // Initialize python plugin + udfdInitializePythonPlugin(plugins + 1); + return; +} + +void udfdDeinitScriptPlugins() { + + return; +} + + void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; @@ -180,14 +387,43 @@ void udfdProcessRequest(uv_work_t *req) { } } -void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { - // TODO: tracable id from client. connect, setup, call, teardown - fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); - SUdfSetupRequest *setup = &request->setup; - int32_t code = TSDB_CODE_SUCCESS; - SUdf *udf = NULL; +void convertUdf2UdfInfo(SUdf *udf, SUdfInfo *udfInfo) { + udfInfo->bufSize = udf->bufSize; + udfInfo->funcType = udf->funcType; + strncpy(udfInfo->name, udf->name, strlen(udf->name)); + udfInfo->outputLen = udf->outputLen; + udfInfo->outputType = udf->outputType; + strncpy(udfInfo->path, udf->path, strlen(udf->path)); + udfInfo->scriptType = udf->scriptType; +} + +int32_t udfdInitUdf(char *udfName, SUdf *udf) { + int32_t err = 0; + err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); + if (err != 0) { + fnError("can not retrieve udf from mnode. udf name %s", udfName); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + //TODO: remove script plugins mutex + uv_mutex_lock(&global.scriptPluginsMutex); + SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; + if (scriptPlugin == NULL) { + fnError("udf name %s script type %d not supported", udfName, udf->scriptType); + uv_mutex_unlock(&global.scriptPluginsMutex); + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } + uv_mutex_unlock(&global.scriptPluginsMutex); + udf->scriptPlugin = scriptPlugin; + SUdfInfo info = {0}; + convertUdf2UdfInfo(udf, &info); + udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); + return 0; +} + +SUdf *udfdGetOrCreateUdf(const char *udfName) { + SUdf *udf = NULL; uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); + SUdf **udfInHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); if (udfInHash) { ++(*udfInHash)->refCount; udf = *udfInHash; @@ -195,23 +431,35 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } else { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; + strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); + udfNew->state = UDF_STATE_INIT; uv_mutex_init(&udfNew->lock); uv_cond_init(&udfNew->condReady); udf = udfNew; SUdf **pUdf = &udf; - taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES); + taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex); } + return udf; +} + +void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + // TODO: tracable id from client. connect, setup, call, teardown + fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); + + SUdfSetupRequest *setup = &request->setup; + int32_t code = TSDB_CODE_SUCCESS; + SUdf *udf = NULL; + + udf = udfdGetOrCreateUdf(setup->udfName); uv_mutex_lock(&udf->lock); if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; - code = udfdLoadUdf(setup->udfName, udf); - if (udf->initFunc) { - udf->initFunc(); - } + code = udfdInitUdf(setup->udfName, udf); + udf->resident = false; for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { char *funcName = taosArrayGet(global.residentFuncs, i); @@ -270,7 +518,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); - code = udf->scalarProcFunc(&input, &output); + code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); freeUdfDataDataBlock(&input); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); freeUdfColumn(&output); @@ -278,7 +526,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } case TSDB_UDF_CALL_AGG_INIT: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - udf->aggStartFunc(&outBuf); + code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); subRsp->resultBuf = outBuf; break; } @@ -286,7 +534,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); + code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); freeUdfDataDataBlock(&input); subRsp->resultBuf = outBuf; @@ -295,7 +543,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } case TSDB_UDF_CALL_AGG_MERGE: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf); + code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); freeUdfInterBuf(&call->interBuf2); subRsp->resultBuf = outBuf; @@ -304,7 +552,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggFinishFunc(&call->interBuf, &outBuf); + code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); subRsp->resultBuf = outBuf; break; @@ -374,11 +622,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { if (unloadUdf) { uv_cond_destroy(&udf->condReady); uv_mutex_destroy(&udf->lock); - if (udf->destroyFunc) { - (udf->destroyFunc)(); - } - uv_dlclose(&udf->lib); - taosMemoryFree(udf); + udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); } taosMemoryFree(handle); @@ -440,7 +684,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { goto _return; } SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf *udf = msgInfo->param; + // SUdf *udf = msgInfo->param; + SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; @@ -455,12 +700,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { } char path[PATH_MAX] = {0}; -#ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name); -#else - snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name); -#endif - + #ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); + #else + snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); + #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); @@ -550,65 +794,10 @@ int32_t udfdConnectToMnode() { return code; } -int32_t udfdLoadUdf(char *udfName, SUdf *udf) { - strncpy(udf->name, udfName, TSDB_FUNC_NAME_LEN); - int32_t err = 0; - - err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf); - if (err != 0) { - fnError("can not retrieve udf from mnode. udf name %s", udfName); - return TSDB_CODE_UDF_LOAD_UDF_FAILURE; - } - - err = uv_dlopen(udf->path, &udf->lib); - if (err != 0) { - fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); - return TSDB_CODE_UDF_LOAD_UDF_FAILURE; - } - - char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *initSuffix = "_init"; - strcpy(initFuncName, udfName); - strncat(initFuncName, initSuffix, strlen(initSuffix)); - uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc)); - - char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *destroySuffix = "_destroy"; - strcpy(destroyFuncName, udfName); - strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); - uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc)); - - if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { - char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(processFuncName, udfName); - uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); - } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { - char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(processFuncName, udfName); - uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc)); - char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *startSuffix = "_start"; - strncpy(startFuncName, processFuncName, sizeof(startFuncName)); - strncat(startFuncName, startSuffix, strlen(startSuffix)); - uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc)); - char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; - char *finishSuffix = "_finish"; - strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); - strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); - uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); - char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *mergeSuffix = "_merge"; - strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); - strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); - uv_dlsym(&udf->lib, mergeFuncName, (void **)(&udf->aggMergeFunc)); - } - return 0; -} static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || - code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || - code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || - code == TSDB_CODE_APP_IS_STOPPING) { + code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING || + code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; @@ -765,7 +954,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { } void udfdHandleRequest(SUdfdUvConn *conn) { - char *inputBuf = conn->inputBuf; + char *inputBuf = conn->inputBuf; int32_t inputLen = conn->inputLen; uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); @@ -784,7 +973,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) { void udfdPipeCloseCb(uv_handle_t *pipe) { SUdfdUvConn *conn = pipe->data; - SUvUdfWork* pWork = conn->pWorkList; + SUvUdfWork *pWork = conn->pWorkList; while (pWork != NULL) { pWork->conn = NULL; pWork = pWork->pWorkNext; @@ -960,6 +1149,8 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) { } static int32_t udfdRun() { + uv_mutex_init(&global.scriptPluginsMutex); + global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); @@ -1017,11 +1208,7 @@ int32_t udfdDeinitResidentFuncs() { SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); if (udfInHash) { SUdf *udf = *udfInHash; - if (udf->destroyFunc) { - (udf->destroyFunc)(); - } - uv_dlclose(&udf->lib); - taosMemoryFree(udf); + udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); taosHashRemove(global.udfsHash, funcName, strlen(funcName)); } } @@ -1072,6 +1259,8 @@ int main(int argc, char *argv[]) { return -5; } + udfdInitScriptPlugins(); + udfdInitResidentFuncs(); uv_thread_t mnodeConnectThread; @@ -1083,6 +1272,9 @@ int main(int argc, char *argv[]) { udfdCloseClientRpc(); udfdDeinitResidentFuncs(); + + udfdDeinitScriptPlugins(); + udfdCleanup(); return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index bab3edc870..6621192291 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -573,6 +573,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid functio TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") From 2e266771b15b0ea962e934596789ed9e0884199c Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 14 Feb 2023 13:08:32 +0800 Subject: [PATCH 02/33] fix: dispatch to udf by language type and udf name --- source/libs/function/src/udfd.c | 34 ++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index f50ce71b50..1f6f7902a4 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -329,7 +329,7 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; - sprintf("%s", plugin->libPath, "libtaosudf_py.so"); + sprintf(plugin->libPath, "%s", "libtaosudf_py.so"); plugin->libLoaded = false; const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"open", "close", "udfInit", "udfDestroy", "udfScalarProc", "udfAggStart", @@ -347,17 +347,41 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { return; } +void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { + return; +} + +void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { + if (plugin->libLoaded) { + uv_dlclose(&plugin->lib); + plugin->libLoaded = false; + } +} + void udfdInitScriptPlugins() { - SUdfScriptPlugin *plugins = taosMemoryCalloc(2, sizeof(SUdfScriptPlugin)); - // Initialize c language plugin - udfdInitializeCPlugin(plugins + 0); + SUdfScriptPlugin *plugin = NULL; + + // Initialize c plugin + plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); + udfdInitializeCPlugin(plugin); + global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = plugin; + // Initialize python plugin - udfdInitializePythonPlugin(plugins + 1); + plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); + udfdInitializePythonPlugin(plugin); + global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = plugin; return; } void udfdDeinitScriptPlugins() { + SUdfScriptPlugin *plugin = NULL; + plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON]; + udfdDeinitPythonPlugin(plugin); + taosMemoryFree(plugin); + plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB]; + udfdDeinitCPlugin(plugin); + taosMemoryFree(plugin); return; } From 6b09b72a63e3f1f3e7106c18eb85e096976e31dd Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 14 Feb 2023 17:14:41 +0800 Subject: [PATCH 03/33] fix: make udfinfo temporary --- include/libs/function/taosudf.h | 9 +++------ source/libs/function/src/udfd.c | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 65d3a98654..eb229c24fc 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -73,6 +73,7 @@ typedef struct SUdfDataBlock { SUdfColumn **udfCols; } SUdfDataBlock; +// TODO: deprecate SUdfInterBuf.numOfResult or add isInitial to SUdfInterBuf typedef struct SUdfInterBuf { int32_t bufLen; char *buf; @@ -260,11 +261,9 @@ typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *input typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// TODO: capacity for path and -// define macro for name and path len or use dynamic allocation/shared with SUdf. typedef struct SUdfInfo { - char name[65]; + char *name; int8_t funcType; int8_t scriptType; @@ -272,11 +271,9 @@ typedef struct SUdfInfo { int32_t outputLen; int32_t bufSize; - char path[512]; + char *path; } SUdfInfo; -// TODO: deprecate SUdfInterBuf.numOfResult or add isInitial to SUdfInterBuf - typedef int32_t (*TScriptUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx); typedef int32_t (*TScriptUdfAggStartFunc)(SUdfInterBuf *buf, void *udfCtx); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 1f6f7902a4..b42ff1564d 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -414,10 +414,10 @@ void udfdProcessRequest(uv_work_t *req) { void convertUdf2UdfInfo(SUdf *udf, SUdfInfo *udfInfo) { udfInfo->bufSize = udf->bufSize; udfInfo->funcType = udf->funcType; - strncpy(udfInfo->name, udf->name, strlen(udf->name)); + udfInfo->name = udf->name; udfInfo->outputLen = udf->outputLen; udfInfo->outputType = udf->outputType; - strncpy(udfInfo->path, udf->path, strlen(udf->path)); + udfInfo->path = udf->path; udfInfo->scriptType = udf->scriptType; } From 634046cc97ceff9881e4f879c27e0b2460099473 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 15 Feb 2023 09:25:30 +0800 Subject: [PATCH 04/33] fix: uv_dlopen asan error --- source/libs/function/src/udfd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index b42ff1564d..398f4f636c 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -352,8 +352,8 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { } void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { + uv_dlclose(&plugin->lib); if (plugin->libLoaded) { - uv_dlclose(&plugin->lib); plugin->libLoaded = false; } } From 89db7bf60010ddd3ed09547d6919bd76bdda5f5a Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 15 Feb 2023 12:22:54 +0800 Subject: [PATCH 05/33] fix: memory leak error --- source/libs/function/src/udfd.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 398f4f636c..9cd04dc353 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -647,6 +647,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_cond_destroy(&udf->condReady); uv_mutex_destroy(&udf->lock); udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); + taosMemoryFree(udf); } taosMemoryFree(handle); From b4d8a327eaa8154e11cb79482ad59070535f610c Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 20 Feb 2023 16:52:25 +0800 Subject: [PATCH 06/33] fix: change python plugin function name --- source/libs/function/src/udfd.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 9cd04dc353..d6c48fb979 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -331,9 +331,9 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; sprintf(plugin->libPath, "%s", "libtaosudf_py.so"); plugin->libLoaded = false; - const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"open", "close", "udfInit", - "udfDestroy", "udfScalarProc", "udfAggStart", - "udfAggFinish", "udfAggProc", "udfAggMerge"}; + const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", + "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", + "pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"}; void **funcs[MAX_NUM_PLUGIN_FUNCS] = { (void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc, (void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc, From 8e5dae3970723f0e86b070ed58760e653bf4a3a9 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 21 Feb 2023 17:09:26 +0800 Subject: [PATCH 07/33] fix: change script udf info structure --- include/libs/function/taosudf.h | 13 +++++++++---- source/libs/function/src/udfd.c | 16 ++++++++++------ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index eb229c24fc..4e33bf633c 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -262,17 +262,22 @@ typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -typedef struct SUdfInfo { +typedef enum EUdfFuncType { + UDF_FUNC_TYPE_SCALAR = 1, + UDF_FUNC_TYPE_AGG = 2 +} EUdfFuncType; + +typedef struct SScriptUdfInfo { char *name; - int8_t funcType; + EUdfFuncType funcType; int8_t scriptType; int8_t outputType; int32_t outputLen; int32_t bufSize; char *path; -} SUdfInfo; +} SScriptUdfInfo; typedef int32_t (*TScriptUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx); @@ -282,7 +287,7 @@ typedef int32_t (*TScriptUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf * typedef int32_t (*TScriptUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, void *udfCtx); typedef int32_t (*TScriptUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx); -typedef int32_t (*TScriptUdfInitFunc)(SUdfInfo *info, void **pUdfCtx); +typedef int32_t (*TScriptUdfInitFunc)(SScriptUdfInfo *info, void **pUdfCtx); typedef int32_t (*TScriptUdfDestoryFunc)(void *udfCtx); // the following function is for open/close script plugin. diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 9cd04dc353..783f6f1170 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -52,7 +52,7 @@ int32_t udfdCPluginOpen(void *scriptCtx) { return 0; } int32_t udfdCPluginClose(void *scriptCtx) { return 0; } -int32_t udfdCPluginUdfInit(SUdfInfo *udf, void **pUdfCtx) { +int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { int32_t err = 0; SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); err = uv_dlopen(udf->path, &udfCtx->lib); @@ -73,11 +73,11 @@ int32_t udfdCPluginUdfInit(SUdfInfo *udf, void **pUdfCtx) { strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); - if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { + if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); - } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { + } else if (udf->funcType == UDF_FUNC_TYPE_AGG) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); @@ -411,9 +411,13 @@ void udfdProcessRequest(uv_work_t *req) { } } -void convertUdf2UdfInfo(SUdf *udf, SUdfInfo *udfInfo) { +void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->bufSize = udf->bufSize; - udfInfo->funcType = udf->funcType; + if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { + udfInfo->funcType = UDF_FUNC_TYPE_AGG; + } else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { + udfInfo->funcType = UDF_FUNC_TYPE_SCALAR; + } udfInfo->name = udf->name; udfInfo->outputLen = udf->outputLen; udfInfo->outputType = udf->outputType; @@ -438,7 +442,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { } uv_mutex_unlock(&global.scriptPluginsMutex); udf->scriptPlugin = scriptPlugin; - SUdfInfo info = {0}; + SScriptUdfInfo info = {0}; convertUdf2UdfInfo(udf, &info); udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); return 0; From 898bd0ed332332886e3f99f9f592a5a568f23b48 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 21 Feb 2023 21:24:09 +0800 Subject: [PATCH 08/33] feat: script open function accept env item --- include/libs/function/taosudf.h | 8 ++++++-- source/libs/function/src/udfd.c | 33 +++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 4e33bf633c..1ef4f59b2c 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -261,6 +261,10 @@ typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *input typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +typedef struct SScriptUdfEnvItem{ + char *name; + char *value; +} SScriptUdfEnvItem; typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, @@ -291,8 +295,8 @@ typedef int32_t (*TScriptUdfInitFunc)(SScriptUdfInfo *info, void **pUdfCtx); typedef int32_t (*TScriptUdfDestoryFunc)(void *udfCtx); // the following function is for open/close script plugin. -typedef int32_t (*TScriptOpenFunc)(void *scriptCtx); -typedef int32_t (*TScriptCloseFunc)(void *scriptCtx); +typedef int32_t (*TScriptOpenFunc)(SScriptUdfEnvItem* items, int numItems); +typedef int32_t (*TScriptCloseFunc)(); #ifdef __cplusplus } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index d2a61d74e2..ed5f87647f 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -48,9 +48,9 @@ typedef struct SUdfCPluginCtx { TUdfDestroyFunc destroyFunc; } SUdfCPluginCtx; -int32_t udfdCPluginOpen(void *scriptCtx) { return 0; } +int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } -int32_t udfdCPluginClose(void *scriptCtx) { return 0; } +int32_t udfdCPluginClose() { return 0; } int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { int32_t err = 0; @@ -308,6 +308,9 @@ void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->udfAggProcFunc = udfdCPluginUdfAggProc; plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; + + SScriptUdfEnvItem items[0]; + plugin->openFunc(items, 0); return; } @@ -343,19 +346,45 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { fnError("can not load python plugin. lib path %s", plugin->libPath); return; } + if (plugin->openFunc) { + SScriptUdfEnvItem items[] ={{"PYTHONPATH", tsUdfdLdLibPath}}; + plugin->openFunc(items, 1); + } plugin->libLoaded = true; return; } void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { + if (plugin->closeFunc) { + plugin->closeFunc(); + } + plugin->openFunc = NULL; + plugin->closeFunc = NULL; + plugin->udfInitFunc = NULL; + plugin->udfDestroyFunc = NULL; + plugin->udfScalarProcFunc = NULL; + plugin->udfAggStartFunc = NULL; + plugin->udfAggProcFunc = NULL; + plugin->udfAggMergeFunc = NULL; + plugin->udfAggFinishFunc = NULL; return; } void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { + plugin->closeFunc(); uv_dlclose(&plugin->lib); if (plugin->libLoaded) { plugin->libLoaded = false; } + plugin->openFunc = NULL; + plugin->closeFunc = NULL; + plugin->udfInitFunc = NULL; + plugin->udfDestroyFunc = NULL; + plugin->udfScalarProcFunc = NULL; + plugin->udfAggStartFunc = NULL; + plugin->udfAggProcFunc = NULL; + plugin->udfAggMergeFunc = NULL; + plugin->udfAggFinishFunc = NULL; } void udfdInitScriptPlugins() { From 21cb3d0821b0963adc02ca6491840bac0eafc819 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 22 Feb 2023 12:11:57 +0800 Subject: [PATCH 09/33] fix: improve pythonpath env variable to include tsTempDir --- source/libs/function/src/udfd.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index ed5f87647f..0e3c6a4def 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -347,8 +347,16 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { return; } if (plugin->openFunc) { - SScriptUdfEnvItem items[] ={{"PYTHONPATH", tsUdfdLdLibPath}}; + int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsTempDir) + 1 + 1; //tsTempDir:tsUdfdLdLibPath + char* pythonPath= taosMemoryMalloc(lenPythonPath); + #ifdef WINDOWS + snprintf(pythonPath, lenPythonPath, "%s;%s", tsTempDir, tsUdfdLdLibPath); + #else + snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); + #endif + SScriptUdfEnvItem items[] ={{"PYTHONPATH", pythonPath}}; plugin->openFunc(items, 1); + taosMemoryFree(pythonPath); } plugin->libLoaded = true; return; From a7888257d704400cd47c4c72f859dd7f6d39588b Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 23 Feb 2023 14:56:52 +0800 Subject: [PATCH 10/33] fix: call closeFunc when it is not nullptr --- source/libs/function/src/udfd.c | 40 ++++++++++++++++----------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 0e3c6a4def..ec49c39c0e 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -33,7 +33,6 @@ #define MAX_NUM_SCRIPT_PLUGINS 64 #define MAX_NUM_PLUGIN_FUNCS 9 - typedef struct SUdfCPluginCtx { uv_lib_t lib; @@ -347,14 +346,14 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { return; } if (plugin->openFunc) { - int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsTempDir) + 1 + 1; //tsTempDir:tsUdfdLdLibPath - char* pythonPath= taosMemoryMalloc(lenPythonPath); - #ifdef WINDOWS - snprintf(pythonPath, lenPythonPath, "%s;%s", tsTempDir, tsUdfdLdLibPath); - #else - snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); - #endif - SScriptUdfEnvItem items[] ={{"PYTHONPATH", pythonPath}}; + int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsTempDir) + 1 + 1; // tsTempDir:tsUdfdLdLibPath + char *pythonPath = taosMemoryMalloc(lenPythonPath); +#ifdef WINDOWS + snprintf(pythonPath, lenPythonPath, "%s;%s", tsTempDir, tsUdfdLdLibPath); +#else + snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); +#endif + SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}}; plugin->openFunc(items, 1); taosMemoryFree(pythonPath); } @@ -374,12 +373,14 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { plugin->udfAggStartFunc = NULL; plugin->udfAggProcFunc = NULL; plugin->udfAggMergeFunc = NULL; - plugin->udfAggFinishFunc = NULL; + plugin->udfAggFinishFunc = NULL; return; } void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { - plugin->closeFunc(); + if (plugin->closeFunc) { + plugin->closeFunc(); + } uv_dlclose(&plugin->lib); if (plugin->libLoaded) { plugin->libLoaded = false; @@ -418,11 +419,10 @@ void udfdDeinitScriptPlugins() { plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB]; udfdDeinitCPlugin(plugin); - taosMemoryFree(plugin); + taosMemoryFree(plugin); return; } - void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; @@ -469,7 +469,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { fnError("can not retrieve udf from mnode. udf name %s", udfName); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } - //TODO: remove script plugins mutex + // TODO: remove script plugins mutex uv_mutex_lock(&global.scriptPluginsMutex); SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; if (scriptPlugin == NULL) { @@ -516,7 +516,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfSetupRequest *setup = &request->setup; int32_t code = TSDB_CODE_SUCCESS; - SUdf *udf = NULL; + SUdf *udf = NULL; udf = udfdGetOrCreateUdf(setup->udfName); @@ -766,11 +766,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { } char path[PATH_MAX] = {0}; - #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); - #else - snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); - #endif +#ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); +#else + snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); +#endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); From 4ef6dd9e7415879365443888d78d3e0e4eb52fd6 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 23 Feb 2023 21:10:52 +0800 Subject: [PATCH 11/33] fix:will deprecated SUdfInterBuf.numOfResult --- include/libs/function/taosudf.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 1ef4f59b2c..5be9c15900 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -73,7 +73,7 @@ typedef struct SUdfDataBlock { SUdfColumn **udfCols; } SUdfDataBlock; -// TODO: deprecate SUdfInterBuf.numOfResult or add isInitial to SUdfInterBuf +// TODO: deprecate SUdfInterBuf.numOfResult typedef struct SUdfInterBuf { int32_t bufLen; char *buf; From df5b0babc44c4be12058ef766716920a73cc0065 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 24 Feb 2023 12:20:55 +0800 Subject: [PATCH 12/33] fix: array[0] not supported in windows --- include/libs/function/taosudf.h | 1 - source/libs/function/src/udfd.c | 4 ++-- source/libs/function/test/udf2.c | 7 +++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 5be9c15900..ee3d85b433 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -73,7 +73,6 @@ typedef struct SUdfDataBlock { SUdfColumn **udfCols; } SUdfDataBlock; -// TODO: deprecate SUdfInterBuf.numOfResult typedef struct SUdfInterBuf { int32_t bufLen; char *buf; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index ec49c39c0e..4e16d5e708 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -308,8 +308,8 @@ void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; - SScriptUdfEnvItem items[0]; - plugin->openFunc(items, 0); + SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; + plugin->openFunc(items, 1); return; } diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index e24789d0fb..faf4daa4e5 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -12,12 +12,15 @@ DLL_EXPORT int32_t udf2_destroy() { return 0; } DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) { *(int64_t*)(buf->buf) = 0; buf->bufLen = sizeof(double); - buf->numOfResult = 0; + buf->numOfResult = 1; return 0; } DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) { - double sumSquares = *(double*)interBuf->buf; + double sumSquares = 0; + if (interBuf->numOfResult == 1) { + sumSquares = *(double*)interBuf->buf; + } int8_t numNotNull = 0; for (int32_t i = 0; i < block->numOfCols; ++i) { SUdfColumn* col = block->udfCols[i]; From c93cbc96f4dab736d35a2cbee9df705c393d4efb Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 27 Feb 2023 15:03:30 +0800 Subject: [PATCH 13/33] fix: change script env item key and value to const char* --- include/libs/function/taosudf.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index ee3d85b433..8277ff2c50 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -261,8 +261,8 @@ typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// typedef struct SScriptUdfEnvItem{ - char *name; - char *value; + const char *name; + const char *value; } SScriptUdfEnvItem; typedef enum EUdfFuncType { From 029444e1c23705e6821a5e0015154d8dce9ba76c Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 28 Feb 2023 10:39:08 +0800 Subject: [PATCH 14/33] fix: use const for name and path in SScriptUdfInfo --- include/libs/function/taosudf.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 8277ff2c50..3e796da236 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -271,7 +271,7 @@ typedef enum EUdfFuncType { } EUdfFuncType; typedef struct SScriptUdfInfo { - char *name; + const char *name; EUdfFuncType funcType; int8_t scriptType; @@ -279,7 +279,7 @@ typedef struct SScriptUdfInfo { int32_t outputLen; int32_t bufSize; - char *path; + const char *path; } SScriptUdfInfo; typedef int32_t (*TScriptUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx); From 844a0830330b8475e77efbc7d29f87843a65620b Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 28 Feb 2023 20:25:23 +0800 Subject: [PATCH 15/33] fix: set udf column meta in udfd instead of udf --- source/libs/function/src/udfd.c | 5 ++++- source/libs/function/test/udf1.c | 5 ----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 4e16d5e708..0f761d3d7d 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -580,7 +580,10 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { switch (call->callType) { case TSDB_UDF_CALL_SCALA_PROC: { SUdfColumn output = {0}; - + output.colMeta.bytes = udf->outputLen; + output.colMeta.type = udf->outputType; + output.colMeta.precision = 0; + output.colMeta.scale = 0; SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 71d30b6755..a395fc99a9 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -14,11 +14,6 @@ DLL_EXPORT int32_t udf1_init() { return 0; } DLL_EXPORT int32_t udf1_destroy() { return 0; } DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) { - SUdfColumnMeta *meta = &resultCol->colMeta; - meta->bytes = 4; - meta->type = TSDB_DATA_TYPE_INT; - meta->scale = 0; - meta->precision = 0; SUdfColumnData *resultData = &resultCol->colData; resultData->numOfRows = block->numOfRows; From 2b566dabc77352a7e136d038081f2a2f568a5b15 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 1 Mar 2023 15:37:08 +0800 Subject: [PATCH 16/33] fix: add function not implemented error --- include/libs/function/taosudf.h | 8 +++---- include/util/taoserror.h | 3 ++- source/libs/function/src/udfd.c | 37 +++++++++++++++++++++++---------- source/util/src/terror.c | 1 + 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 3e796da236..7e5b8406bb 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -80,10 +80,6 @@ typedef struct SUdfInterBuf { } SUdfInterBuf; typedef void *UdfcFuncHandle; -// dynamic lib init and destroy -typedef int32_t (*TUdfInitFunc)(); -typedef int32_t (*TUdfDestroyFunc)(); - #define UDF_MEMORY_EXP_GROWTH 1.5 #define NBIT (3u) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) @@ -252,6 +248,10 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentR return 0; } +// dynamic lib init and destroy for C UDF +typedef int32_t (*TUdfInitFunc)(); +typedef int32_t (*TUdfDestroyFunc)(); + typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index bdaa68dfb7..29f36f3d0c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -711,7 +711,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908) #define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2909) #define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x290A) -#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B) +#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B) +#define TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED TAOS_DEF_ERROR_CODE(0, 0x290C) // sml #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 0f761d3d7d..ca474e7cc8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -96,8 +96,15 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); } + int32_t code = 0; if (udfCtx->initFunc) { - (udfCtx->initFunc)(); + // TODO: handle init call return error + code = (udfCtx->initFunc)(); + if (code != 0) { + uv_dlclose(&udfCtx->lib); + taosMemoryFree(udfCtx); + return code; + } } *pUdfCtx = udfCtx; return 0; @@ -105,26 +112,30 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { int32_t udfdCPluginUdfDestroy(void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; + int32_t code = 0; if (ctx->destroyFunc) { - (ctx->destroyFunc)(); + code = (ctx->destroyFunc)(); } uv_dlclose(&ctx->lib); taosMemoryFree(ctx); - return 0; + return code; } int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->scalarProcFunc) { - ctx->scalarProcFunc(block, resultCol); + return ctx->scalarProcFunc(block, resultCol); + } else { + return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; } - return 0; } int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggStartFunc) { - ctx->aggStartFunc(buf); + return ctx->aggStartFunc(buf); + } else { + return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; } return 0; } @@ -132,24 +143,28 @@ int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggProcFunc) { - ctx->aggProcFunc(block, interBuf, newInterBuf); + return ctx->aggProcFunc(block, interBuf, newInterBuf); + } else { + return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; } - return 0; } int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggMergeFunc) { - ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); + return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); + } else { + return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; } - return 0; } int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggFinishFunc) { - ctx->aggFinishFunc(buf, resultData); + return ctx->aggFinishFunc(buf, resultData); + } else { + return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; } return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3310d0a4b5..07c0a20439 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -583,6 +583,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function han TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED, "udf function not implemented") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") From 2491821278378e56cd4555e2c34d461bfe792f21 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 4 Mar 2023 15:23:13 +0800 Subject: [PATCH 17/33] fix: change taopyudf library name --- source/libs/function/src/udfd.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 4123afc829..366e890bd7 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -346,7 +346,8 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; - sprintf(plugin->libPath, "%s", "libtaosudf_py.so"); + //todo: windows support + sprintf(plugin->libPath, "%s", "libtaospyudf.so"); plugin->libLoaded = false; const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", From 6c4400c3ba55a822e6a035349efe56438c51eb32 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 6 Mar 2023 09:20:55 +0800 Subject: [PATCH 18/33] fix: the buf returend by the udf has a smaller size than the function bufsize --- source/libs/function/src/tudf.c | 17 ++++++++++------- source/libs/function/src/udfd.c | 9 ++++++--- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 8f5cd070dc..90baabce2c 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -899,9 +899,11 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { typedef struct SUdfAggRes { int8_t finalResNum; int8_t interResNum; + int32_t interResBufLen; char *finalResBuf; char *interResBuf; } SUdfAggRes; + void onUdfcPipeClose(uv_handle_t *handle); int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask); void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf); @@ -1096,9 +1098,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult releaseUdfFuncHandle(pCtx->udfName); return false; } - udfRes->interResNum = buf.numOfResult; if (buf.bufLen <= session->bufSize) { memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); + udfRes->interResBufLen = buf.bufLen; + udfRes->interResNum = buf.numOfResult; } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); releaseUdfFuncHandle(pCtx->udfName); @@ -1136,7 +1139,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); - SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); @@ -1144,17 +1147,17 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { fnError("udfAggProcess error. code: %d", udfCode); newState.numOfResult = 0; } else { - udfRes->interResNum = newState.numOfResult; if (newState.bufLen <= session->bufSize) { memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); + udfRes->interResBufLen = newState.bufLen; + udfRes->interResNum = newState.numOfResult; } else { fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize); udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE; } } - if (newState.numOfResult == 1 || state.numOfResult == 1) { - GET_RES_INFO(pCtx)->numOfRes = 1; - } + + GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum; blockDataDestroy(inputBlock); @@ -1180,7 +1183,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; SUdfInterBuf resultBuf = {0}; - SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; int32_t udfCallCode = 0; udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf); if (udfCallCode != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 366e890bd7..8db9b386df 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -173,6 +173,7 @@ int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, voi // for others, dlopen/dlsym to find function pointers typedef struct SUdfScriptPlugin { int8_t scriptType; + const char* scriptSuffix; char libPath[PATH_MAX]; bool libLoaded; @@ -313,6 +314,7 @@ static void udfdConnectMnodeThreadFunc(void *args); void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; + plugin->scriptSuffix = '.so'; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; plugin->udfInitFunc = udfdCPluginUdfInit; @@ -346,6 +348,7 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; + plugin->scriptSuffix = "py"; //todo: windows support sprintf(plugin->libPath, "%s", "libtaospyudf.so"); plugin->libLoaded = false; @@ -776,7 +779,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - + char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix; if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; msgInfo->code = terrno; @@ -786,9 +789,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); + snprintf(path, sizeof(path), "%s%s.%s", tsTempDir, pFuncInfo->name, suffix); #else - snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); + snprintf(path, sizeof(path), "%s/%s.%s", tsTempDir, pFuncInfo->name, suffix); #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { From d8c7eb0cdd03c567b5862c8997151dfa433ad12f Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 6 Mar 2023 09:26:21 +0800 Subject: [PATCH 19/33] fix: fix minior errors of previous commit --- source/libs/function/src/udfd.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 8db9b386df..9d555b33da 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -314,7 +314,7 @@ static void udfdConnectMnodeThreadFunc(void *args); void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; - plugin->scriptSuffix = '.so'; + plugin->scriptSuffix = "so"; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; plugin->udfInitFunc = udfdCPluginUdfInit; @@ -779,7 +779,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix; + const char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix; if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; msgInfo->code = terrno; From 916a4b2d8122d15d489c82319f765b386605c939 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 7 Mar 2023 09:57:07 +0800 Subject: [PATCH 20/33] feat: pass logdir to python plugin --- source/libs/function/src/udfd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 9d555b33da..ec8cde58e1 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -372,7 +372,7 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { #else snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); #endif - SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}}; + SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; plugin->openFunc(items, 1); taosMemoryFree(pythonPath); } From 6dae414e1a23fcfe9845bef752588d1e892305cb Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 7 Mar 2023 12:01:26 +0800 Subject: [PATCH 21/33] enhance: delay loading of python plugin --- source/libs/function/src/udfd.c | 104 ++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 37 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index ec8cde58e1..9401398838 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -30,8 +30,9 @@ #include "tmisce.h" // clang-format on -#define MAX_NUM_SCRIPT_PLUGINS 64 -#define MAX_NUM_PLUGIN_FUNCS 9 +#define UDFD_MAX_SCRIPT_PLUGINS 64 +#define UDFD_MAX_SCRIPT_TYPE 1 +#define UDFD_MAX_PLUGIN_FUNCS 9 typedef struct SUdfCPluginCtx { uv_lib_t lib; @@ -173,7 +174,6 @@ int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, voi // for others, dlopen/dlsym to find function pointers typedef struct SUdfScriptPlugin { int8_t scriptType; - const char* scriptSuffix; char libPath[PATH_MAX]; bool libLoaded; @@ -206,7 +206,7 @@ typedef struct SUdfdContext { SHashObj *udfsHash; uv_mutex_t scriptPluginsMutex; - SUdfScriptPlugin *scriptPlugins[MAX_NUM_SCRIPT_PLUGINS]; + SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS]; SArray *residentFuncs; @@ -314,7 +314,6 @@ static void udfdConnectMnodeThreadFunc(void *args); void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; - plugin->scriptSuffix = "so"; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; plugin->udfInitFunc = udfdCPluginUdfInit; @@ -348,18 +347,17 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; - plugin->scriptSuffix = "py"; - //todo: windows support + // todo: windows support sprintf(plugin->libPath, "%s", "libtaospyudf.so"); plugin->libLoaded = false; - const char *funcName[MAX_NUM_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", - "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", - "pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"}; - void **funcs[MAX_NUM_PLUGIN_FUNCS] = { + const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit", + "pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart", + "pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"}; + void **funcs[UDFD_MAX_PLUGIN_FUNCS] = { (void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc, (void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc, (void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc, (void **)&plugin->udfAggMergeFunc}; - int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, MAX_NUM_PLUGIN_FUNCS); + int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS); if (err != 0) { fnError("can not load python plugin. lib path %s", plugin->libPath); return; @@ -415,30 +413,39 @@ void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { plugin->udfAggFinishFunc = NULL; } -void udfdInitScriptPlugins() { - SUdfScriptPlugin *plugin = NULL; +int32_t udfdInitScriptPlugin(int8_t scriptType) { + SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - // Initialize c plugin - plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - udfdInitializeCPlugin(plugin); - global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB] = plugin; + switch (scriptType) { + case TSDB_FUNC_SCRIPT_BIN_LIB: + udfdInitializeCPlugin(plugin); + break; + case TSDB_FUNC_SCRIPT_PYTHON: + udfdInitializePythonPlugin(plugin); + break; + default: + fnError("udf script type %d not supported", scriptType); + taosMemoryFree(plugin); + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } - // Initialize python plugin - plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - udfdInitializePythonPlugin(plugin); - global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON] = plugin; - return; + global.scriptPlugins[scriptType] = plugin; + return TSDB_CODE_SUCCESS; } void udfdDeinitScriptPlugins() { SUdfScriptPlugin *plugin = NULL; plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON]; - udfdDeinitPythonPlugin(plugin); - taosMemoryFree(plugin); + if (plugin != NULL) { + udfdDeinitPythonPlugin(plugin); + taosMemoryFree(plugin); + } plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB]; - udfdDeinitCPlugin(plugin); - taosMemoryFree(plugin); + if (plugin != NULL) { + udfdDeinitCPlugin(plugin); + taosMemoryFree(plugin); + } return; } @@ -481,6 +488,22 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->scriptType = udf->scriptType; } +int32_t udfdRenameUdfFile(SUdf *udf) { + char newPath[PATH_MAX]; + if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { + snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name); + taosRenameFile(udf->path, newPath); + sprintf(udf->path, "%s", newPath); + } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { + snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name); + taosRenameFile(udf->path, newPath); + sprintf(udf->path, "%s", newPath); + } else { + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } + return 0; +} + int32_t udfdInitUdf(char *udfName, SUdf *udf) { int32_t err = 0; err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); @@ -488,16 +511,25 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { fnError("can not retrieve udf from mnode. udf name %s", udfName); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } - // TODO: remove script plugins mutex + if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) { + fnError("udf name %s script type %d not supported", udfName, udf->scriptType); + return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + } + uv_mutex_lock(&global.scriptPluginsMutex); SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; if (scriptPlugin == NULL) { - fnError("udf name %s script type %d not supported", udfName, udf->scriptType); - uv_mutex_unlock(&global.scriptPluginsMutex); - return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; + err = udfdInitScriptPlugin(udf->scriptType); + if (err != 0) { + uv_mutex_unlock(&global.scriptPluginsMutex); + return err; + } } uv_mutex_unlock(&global.scriptPluginsMutex); - udf->scriptPlugin = scriptPlugin; + udf->scriptPlugin = global.scriptPlugins[udf->scriptType]; + + udfdRenameUdfFile(udf); + SScriptUdfInfo info = {0}; convertUdf2UdfInfo(udf, &info); udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); @@ -779,7 +811,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - const char* suffix = global.scriptPlugins[udf->scriptType]->scriptSuffix; + if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; msgInfo->code = terrno; @@ -789,9 +821,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s.%s", tsTempDir, pFuncInfo->name, suffix); + snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); #else - snprintf(path, sizeof(path), "%s/%s.%s", tsTempDir, pFuncInfo->name, suffix); + snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { @@ -1353,8 +1385,6 @@ int main(int argc, char *argv[]) { return -5; } - udfdInitScriptPlugins(); - udfdInitResidentFuncs(); uv_thread_t mnodeConnectThread; From cb73a5c131188840d3b08c28d0f264777ccb5b56 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 7 Mar 2023 20:49:14 +0800 Subject: [PATCH 22/33] fix: check udf python plugin load/open failure --- source/libs/function/src/udfd.c | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 9401398838..1ae84ff7a5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -345,7 +345,7 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], return 0; } -void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { +int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; // todo: windows support sprintf(plugin->libPath, "%s", "libtaospyudf.so"); @@ -360,8 +360,9 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS); if (err != 0) { fnError("can not load python plugin. lib path %s", plugin->libPath); - return; + return err; } + if (plugin->openFunc) { int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsTempDir) + 1 + 1; // tsTempDir:tsUdfdLdLibPath char *pythonPath = taosMemoryMalloc(lenPythonPath); @@ -371,11 +372,17 @@ void udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); #endif SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; - plugin->openFunc(items, 1); + err = plugin->openFunc(items, 1); taosMemoryFree(pythonPath); } + if (err != 0) { + fnError("udf script python plugin open func failed. error: %d", err); + uv_dlclose(&plugin->lib); + return err; + } plugin->libLoaded = true; - return; + + return 0; } void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { @@ -420,9 +427,14 @@ int32_t udfdInitScriptPlugin(int8_t scriptType) { case TSDB_FUNC_SCRIPT_BIN_LIB: udfdInitializeCPlugin(plugin); break; - case TSDB_FUNC_SCRIPT_PYTHON: - udfdInitializePythonPlugin(plugin); + case TSDB_FUNC_SCRIPT_PYTHON: { + int32_t err = udfdInitializePythonPlugin(plugin); + if (err != 0) { + taosMemoryFree(plugin); + return err; + } break; + } default: fnError("udf script type %d not supported", scriptType); taosMemoryFree(plugin); @@ -518,7 +530,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { uv_mutex_lock(&global.scriptPluginsMutex); SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; - if (scriptPlugin == NULL) { + if (scriptPlugin == NULL || scriptPlugin->libLoaded == false) { err = udfdInitScriptPlugin(udf->scriptType); if (err != 0) { uv_mutex_unlock(&global.scriptPluginsMutex); From d7ba6040230897c7b1d1289e1afadb998824248d Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 7 Mar 2023 22:43:31 +0800 Subject: [PATCH 23/33] fix: c plugin libLoaded = false --- source/libs/function/src/udfd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 1ae84ff7a5..5f33a6552c 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -530,7 +530,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { uv_mutex_lock(&global.scriptPluginsMutex); SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType]; - if (scriptPlugin == NULL || scriptPlugin->libLoaded == false) { + if (scriptPlugin == NULL) { err = udfdInitScriptPlugin(udf->scriptType); if (err != 0) { uv_mutex_unlock(&global.scriptPluginsMutex); From 88d2d575346078d64f72390232dd3576072a52d3 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 8 Mar 2023 10:11:50 +0800 Subject: [PATCH 24/33] enhance: increase udf function intermediate buffer size --- include/util/tdef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index f2da4096d3..eac783cb25 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -202,7 +202,7 @@ typedef enum ELogicConditionType { #define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_COMMENT_LEN 1024 * 1024 #define TSDB_FUNC_CODE_LEN 10 * 1024 * 1024 -#define TSDB_FUNC_BUF_SIZE 512 +#define TSDB_FUNC_BUF_SIZE 4096 * 64 #define TSDB_FUNC_TYPE_SCALAR 1 #define TSDB_FUNC_TYPE_AGGREGATE 2 #define TSDB_FUNC_SCRIPT_BIN_LIB 0 From b0ab4be6f444457e3972e4de79583c5dfc5365c3 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 9 Mar 2023 10:32:09 +0800 Subject: [PATCH 25/33] fix: some minior improvement --- source/libs/function/src/udfd.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 5f33a6552c..5f35173920 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -544,7 +544,11 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { SScriptUdfInfo info = {0}; convertUdf2UdfInfo(udf, &info); - udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); + err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); + if (err != 0) { + fnError("udf name %s init failed. error %d", udfName, err); + return err; + } return 0; } @@ -753,7 +757,8 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { if (unloadUdf) { uv_cond_destroy(&udf->condReady); uv_mutex_destroy(&udf->lock); - udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); + code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); + fnDebug("udfd destroy function returns %d", code); taosMemoryFree(udf); } taosMemoryFree(handle); @@ -1346,8 +1351,10 @@ int32_t udfdDeinitResidentFuncs() { SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); if (udfInHash) { SUdf *udf = *udfInHash; - udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); + int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); + fnDebug("udfd destroy function returns %d", code); taosHashRemove(global.udfsHash, funcName, strlen(funcName)); + taosMemoryFree(udf); } } taosArrayDestroy(global.residentFuncs); From a39ada58a5ce5a043d3652d409f4714f10f87220 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 9 Mar 2023 12:13:59 +0800 Subject: [PATCH 26/33] fix: handle function of udf not implemented --- source/libs/function/src/tudf.c | 2 +- source/libs/function/src/udfd.c | 25 ++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 90baabce2c..fdc3f027f5 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1705,7 +1705,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { if (task->errCode != 0) { fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) } else { - fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); + fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); *funcHandle = task->session; } int32_t err = task->errCode; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 5f35173920..bccc0d9bb7 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -236,7 +236,7 @@ typedef struct SUvUdfWork { struct SUvUdfWork *pWorkNext; } SUvUdfWork; -typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; +typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY} EUdfState; typedef struct SUdf { char name[TSDB_FUNC_NAME_LEN + 1]; @@ -570,6 +570,14 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { uv_cond_init(&udfNew->condReady); udf = udfNew; + udf->resident = false; + for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { + char *funcName = taosArrayGet(global.residentFuncs, i); + if (strcmp(udfName, funcName) == 0) { + udf->resident = true; + break; + } + } SUdf **pUdf = &udf; taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex); @@ -591,20 +599,15 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; code = udfdInitUdf(setup->udfName, udf); - - udf->resident = false; - for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { - char *funcName = taosArrayGet(global.residentFuncs, i); - if (strcmp(setup->udfName, funcName) == 0) { - udf->resident = true; - break; - } + if (code == 0) { + udf->state = UDF_STATE_READY; + } else { + udf->state = UDF_STATE_INIT; } - udf->state = UDF_STATE_READY; uv_cond_broadcast(&udf->condReady); uv_mutex_unlock(&udf->lock); } else { - while (udf->state != UDF_STATE_READY) { + while (udf->state == UDF_STATE_LOADING) { uv_cond_wait(&udf->condReady, &udf->lock); } uv_mutex_unlock(&udf->lock); From c10e32eb6a785f771eb78e0b9a85fed33cdbe37f Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 9 Mar 2023 20:27:08 +0800 Subject: [PATCH 27/33] fix: null bitmap error --- include/libs/function/taosudf.h | 25 ++++++++++++++----------- source/libs/function/src/udfd.c | 2 +- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 7e5b8406bb..3abefd6979 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -149,6 +149,8 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne allocCapacity *= UDF_MEMORY_EXP_GROWTH; } + int32_t existedRows = data->numOfRows; + if (IS_VAR_DATA_TYPE(meta->type)) { char *tmp = (char *)realloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity); if (tmp == NULL) { @@ -156,6 +158,7 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne } data->varLenCol.varOffsets = (int32_t *)tmp; data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity; + memset(&data->varLenCol.varOffsets[existedRows], 0, sizeof(int32_t) * (allocCapacity - existedRows)); // for payload, add data in udfColDataAppend } else { char *tmp = (char *)realloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity)); @@ -164,6 +167,9 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne } data->fixLenCol.nullBitmap = tmp; data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity); + int32_t oldLen = BitmapLen(existedRows); + memset(&data->fixLenCol.nullBitmap[oldLen], 0, BitmapLen(allocCapacity) - oldLen); + if (meta->type == TSDB_DATA_TYPE_NULL) { return TSDB_CODE_SUCCESS; } @@ -260,24 +266,21 @@ typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *input typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData); ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -typedef struct SScriptUdfEnvItem{ +typedef struct SScriptUdfEnvItem { const char *name; const char *value; } SScriptUdfEnvItem; -typedef enum EUdfFuncType { - UDF_FUNC_TYPE_SCALAR = 1, - UDF_FUNC_TYPE_AGG = 2 -} EUdfFuncType; +typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EUdfFuncType; typedef struct SScriptUdfInfo { const char *name; - EUdfFuncType funcType; - int8_t scriptType; - int8_t outputType; - int32_t outputLen; - int32_t bufSize; + EUdfFuncType funcType; + int8_t scriptType; + int8_t outputType; + int32_t outputLen; + int32_t bufSize; const char *path; } SScriptUdfInfo; @@ -294,7 +297,7 @@ typedef int32_t (*TScriptUdfInitFunc)(SScriptUdfInfo *info, void **pUdfCtx); typedef int32_t (*TScriptUdfDestoryFunc)(void *udfCtx); // the following function is for open/close script plugin. -typedef int32_t (*TScriptOpenFunc)(SScriptUdfEnvItem* items, int numItems); +typedef int32_t (*TScriptOpenFunc)(SScriptUdfEnvItem *items, int numItems); typedef int32_t (*TScriptCloseFunc)(); #ifdef __cplusplus diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index bccc0d9bb7..fcc06787e1 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -372,7 +372,7 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); #endif SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; - err = plugin->openFunc(items, 1); + err = plugin->openFunc(items, 2); taosMemoryFree(pythonPath); } if (err != 0) { From df9be12af04bcbd19d41f124a939de79bb29928d Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 9 Mar 2023 20:29:42 +0800 Subject: [PATCH 28/33] fix: fix nullbit map error --- source/libs/function/test/udf1.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index a395fc99a9..1977c09e0f 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -14,10 +14,8 @@ DLL_EXPORT int32_t udf1_init() { return 0; } DLL_EXPORT int32_t udf1_destroy() { return 0; } DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) { - SUdfColumnData *resultData = &resultCol->colData; - resultData->numOfRows = block->numOfRows; - for (int32_t i = 0; i < resultData->numOfRows; ++i) { + for (int32_t i = 0; i < block->numOfRows; ++i) { int j = 0; for (; j < block->numOfCols; ++j) { if (udfColDataIsNull(block->udfCols[j], i)) { From 9076b8640083a4b5e33f45339daed132c2420e70 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 9 Mar 2023 21:20:01 +0800 Subject: [PATCH 29/33] fix: pass CI test --- source/libs/function/test/udf1.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 1977c09e0f..7798a0bf3d 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -35,5 +35,6 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) { #ifdef WINDOWS Sleep(1); #endif + resultData->numOfRows = block->numOfRows; return 0; -} \ No newline at end of file +} From f611a6d467b86a43d9fb6f54a6bb85bad733325a Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 10 Mar 2023 12:36:58 +0800 Subject: [PATCH 30/33] fix: udf can return varchar --- source/libs/function/inc/tudfInt.h | 2 +- source/libs/function/src/tudf.c | 46 +++++++++++++++++++++--------- source/libs/function/src/udfd.c | 12 ++++++-- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index c69d19b8a6..27d3b7930f 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -41,7 +41,7 @@ typedef struct SUdfSetupRequest { typedef struct SUdfSetupResponse { int64_t udfHandle; int8_t outputType; - int32_t outputLen; + int32_t bytes; int32_t bufSize; } SUdfSetupResponse; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index fdc3f027f5..c3d00346ee 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -376,7 +376,7 @@ typedef struct SUdfcUvSession { uv_pipe_t *udfUvPipe; int8_t outputType; - int32_t outputLen; + int32_t bytes; int32_t bufSize; char udfName[TSDB_FUNC_NAME_LEN + 1]; @@ -614,7 +614,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { int32_t len = 0; len += taosEncodeFixedI64(buf, setupRsp->udfHandle); len += taosEncodeFixedI8(buf, setupRsp->outputType); - len += taosEncodeFixedI32(buf, setupRsp->outputLen); + len += taosEncodeFixedI32(buf, setupRsp->bytes); len += taosEncodeFixedI32(buf, setupRsp->bufSize); return len; } @@ -622,7 +622,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) { buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle); buf = taosDecodeFixedI8(buf, &setupRsp->outputType); - buf = taosDecodeFixedI32(buf, &setupRsp->outputLen); + buf = taosDecodeFixedI32(buf, &setupRsp->bytes); buf = taosDecodeFixedI32(buf, &setupRsp->bufSize); return (void *)buf; } @@ -808,6 +808,26 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo } int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { + SUdfColumnMeta* meta = &udfCol->colMeta; + + SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1); + blockDataAppendColInfo(block, &colInfoData); + blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + + SColumnInfoData *col = bdGetColumnInfoData(block, 0); + for (int i = 0; i < udfCol->colData.numOfRows; ++i) { + if (udfColDataIsNull(udfCol, i)) { + colDataSetNULL(col, i); + } else { + char* data = udfColDataGetData(udfCol, i); + colDataSetVal(col, i, data, false); + } + } + block->info.rows = udfCol->colData.numOfRows; + return 0; +} + +int32_t convertUdfColumnToDataBlock2(SUdfColumn *udfCol, SSDataBlock *block) { block->info.rows = udfCol->colData.numOfRows; block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); @@ -1056,9 +1076,9 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, fnError("udfc scalar function calculate error. no column data"); code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } else { - if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) { + if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) { fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", - session->outputType, session->outputLen, output->columnData->info.type, output->columnData->info.bytes); + session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes); code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } } @@ -1086,11 +1106,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult } SUdfcUvSession *session = (SUdfcUvSession *)handle; SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo); - int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; + int32_t envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize; memset(udfRes, 0, envSize); udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes; SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { @@ -1123,7 +1143,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SUdfcUvSession *session = handle; SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes; SInputColumnInfoData *pInput = &pCtx->input; int32_t numOfCols = pInput->numOfInputCols; @@ -1180,7 +1200,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { SUdfcUvSession *session = handle; SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes; SUdfInterBuf resultBuf = {0}; SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; @@ -1190,12 +1210,12 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); GET_RES_INFO(pCtx)->numOfRes = 0; } else { - if (resultBuf.bufLen <= session->outputLen) { - memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); + if (resultBuf.bufLen <= session->bytes) { + memcpy(udfRes->finalResBuf, resultBuf.buf, session->bytes); udfRes->finalResNum = resultBuf.numOfResult; GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } else { - fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen); + fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes); GET_RES_INFO(pCtx)->numOfRes = 0; udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } @@ -1699,7 +1719,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { SUdfSetupResponse *rsp = &task->_setup.rsp; task->session->severHandle = rsp->udfHandle; task->session->outputType = rsp->outputType; - task->session->outputLen = rsp->outputLen; + task->session->bytes = rsp->bytes; task->session->bufSize = rsp->bufSize; strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN); if (task->errCode != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index fcc06787e1..1593f97105 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -621,7 +621,11 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { rsp.code = code; rsp.setupRsp.udfHandle = (int64_t)(handle); rsp.setupRsp.outputType = udf->outputType; - rsp.setupRsp.outputLen = udf->outputLen; + if (!IS_VAR_DATA_TYPE(udf->outputType)) { + rsp.setupRsp.bytes = udf->outputLen; + } else { + rsp.setupRsp.bytes = udf->outputLen + VARSTR_HEADER_SIZE; + } rsp.setupRsp.bufSize = udf->bufSize; int32_t len = encodeUdfResponse(NULL, &rsp); @@ -650,7 +654,11 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { switch (call->callType) { case TSDB_UDF_CALL_SCALA_PROC: { SUdfColumn output = {0}; - output.colMeta.bytes = udf->outputLen; + if (IS_VAR_DATA_TYPE(udf->outputType)) { + output.colMeta.bytes = udf->outputLen + VARSTR_HEADER_SIZE; + } else { + output.colMeta.bytes = udf->outputLen; + } output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0; From 89d05296d800b6039c34dacbcc84bf26311e44bc Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 10 Mar 2023 14:56:35 +0800 Subject: [PATCH 31/33] fix: add varchar/nchar udf support --- include/libs/function/taosudf.h | 1 + source/libs/function/src/udfd.c | 12 ++---------- source/libs/parser/src/parTranslater.c | 1 + 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 3abefd6979..b4daa895fd 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -196,6 +196,7 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn *pColumn, int32_t row) { udfColDataSetNull_f(pColumn, row); } pColumn->hasNull = true; + pColumn->colData.numOfRows = ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; } static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 1593f97105..4bfa5ceb3b 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -621,11 +621,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { rsp.code = code; rsp.setupRsp.udfHandle = (int64_t)(handle); rsp.setupRsp.outputType = udf->outputType; - if (!IS_VAR_DATA_TYPE(udf->outputType)) { - rsp.setupRsp.bytes = udf->outputLen; - } else { - rsp.setupRsp.bytes = udf->outputLen + VARSTR_HEADER_SIZE; - } + rsp.setupRsp.bytes = udf->outputLen; rsp.setupRsp.bufSize = udf->bufSize; int32_t len = encodeUdfResponse(NULL, &rsp); @@ -654,11 +650,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { switch (call->callType) { case TSDB_UDF_CALL_SCALA_PROC: { SUdfColumn output = {0}; - if (IS_VAR_DATA_TYPE(udf->outputType)) { - output.colMeta.bytes = udf->outputLen + VARSTR_HEADER_SIZE; - } else { - output.colMeta.bytes = udf->outputLen; - } + output.colMeta.bytes = udf->outputLen; output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index fdc0a08371..da9c1b9185 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6395,6 +6395,7 @@ static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionS req.funcType = pStmt->isAgg ? TSDB_FUNC_TYPE_AGGREGATE : TSDB_FUNC_TYPE_SCALAR; req.scriptType = pStmt->language; req.outputType = pStmt->outputDt.type; + pStmt->outputDt.bytes = calcTypeBytes(pStmt->outputDt); req.outputLen = pStmt->outputDt.bytes; req.bufSize = pStmt->bufSize; int32_t code = readFromFile(pStmt->libraryPath, &req.codeLen, &req.pCode); From 26b3773740361d9b8393ddd290c90428f57bb787 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 10 Mar 2023 18:58:25 +0800 Subject: [PATCH 32/33] fix: core dump when copy resut buf --- source/libs/function/src/tudf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c3d00346ee..78c2d4bde7 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1211,7 +1211,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { GET_RES_INFO(pCtx)->numOfRes = 0; } else { if (resultBuf.bufLen <= session->bytes) { - memcpy(udfRes->finalResBuf, resultBuf.buf, session->bytes); + memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen); udfRes->finalResNum = resultBuf.numOfResult; GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } else { From c8e05226ceee05a2d5c239f0889f67e04b05b28e Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 12 Mar 2023 09:40:49 +0800 Subject: [PATCH 33/33] fix: unknown error prompt --- include/util/taoserror.h | 2 +- source/libs/function/src/udfd.c | 19 ++++++++++++------- source/util/src/terror.c | 2 +- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0d959271c0..3ffbd02e97 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -713,7 +713,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2909) #define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x290A) #define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B) -#define TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED TAOS_DEF_ERROR_CODE(0, 0x290C) +#define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290C) // sml #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 4bfa5ceb3b..f67b352cc0 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -127,7 +127,8 @@ int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, vo if (ctx->scalarProcFunc) { return ctx->scalarProcFunc(block, resultCol); } else { - return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; + fnError("udfd c plugin scalar proc not implemented"); + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } } @@ -136,7 +137,8 @@ int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { if (ctx->aggStartFunc) { return ctx->aggStartFunc(buf); } else { - return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; + fnError("udfd c plugin aggregation start not implemented"); + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } return 0; } @@ -146,7 +148,8 @@ int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdf if (ctx->aggProcFunc) { return ctx->aggProcFunc(block, interBuf, newInterBuf); } else { - return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; + fnError("udfd c plugin aggregation process not implemented"); + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } } @@ -156,7 +159,8 @@ int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, if (ctx->aggMergeFunc) { return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); } else { - return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; + fnError("udfd c plugin aggregation merge not implemented"); + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } } @@ -165,7 +169,8 @@ int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, voi if (ctx->aggFinishFunc) { return ctx->aggFinishFunc(buf, resultData); } else { - return TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED; + fnError("udfd c plugin aggregation finish not implemented"); + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; } return 0; } @@ -618,7 +623,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfResponse rsp; rsp.seqNum = request->seqNum; rsp.type = request->type; - rsp.code = code; + rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0; rsp.setupRsp.udfHandle = (int64_t)(handle); rsp.setupRsp.outputType = udf->outputType; rsp.setupRsp.bytes = udf->outputLen; @@ -701,7 +706,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { rsp->seqNum = request->seqNum; rsp->type = request->type; - rsp->code = code; + rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0; subRsp->callType = call->callType; int32_t len = encodeUdfResponse(NULL, rsp); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b5890d7083..cb0ccc75df 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -584,7 +584,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function han TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") -TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_NOT_IMPLEMENTED, "udf function not implemented") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_EXEC_FAILURE, "udf function execution failure") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")