diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 317339af04..7c376fcd34 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -874,8 +874,9 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); connReq->data = uvTask; - - uv_pipe_connect(connReq, pipe, "udf.sock", onUdfClientConnect); + char listeningPipeName[32] = {0}; + sprintf(listeningPipeName, "%s%d", UDF_LISTEN_PIPE_NAME_PREFIX, uvTask->udfc->dnodeId); + uv_pipe_connect(connReq, pipe, listeningPipeName, onUdfClientConnect); break; } case UV_TASK_REQ_RSP: { @@ -1019,6 +1020,7 @@ int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle SUdfSetupRequest *req = &task->_setup.req; memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); + req->epSet = *epSet; int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT); if (errCode != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 6540851758..5d0ed0daf0 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -83,8 +83,8 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) { strcpy(udf->name, udfName); - udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf); - + //udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf); + strcpy(udf->path, "libudf1.so"); int err = uv_dlopen(udf->path, &udf->lib); if (err != 0) { fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); @@ -115,8 +115,8 @@ void udfdProcessRequest(uv_work_t *req) { SUdf *udf = NULL; uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); - if (*udfInHash) { + SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName)); + if (udfInHash) { ++(*udfInHash)->refCount; udf = *udfInHash; uv_mutex_unlock(&global.udfsMutex); @@ -128,7 +128,7 @@ void udfdProcessRequest(uv_work_t *req) { uv_mutex_init(&udfNew->lock); uv_cond_init(&udfNew->condReady); udf = udfNew; - taosHashPut(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN, &udfNew, sizeof(&udfNew)); + taosHashPut(global.udfsHash, request.setup.udfName, strlen(request.setup.udfName), &udfNew, sizeof(&udfNew)); uv_mutex_unlock(&global.udfsMutex); } @@ -214,7 +214,7 @@ void udfdProcessRequest(uv_work_t *req) { udf->refCount--; if (udf->refCount == 0) { unloadUdf = true; - taosHashRemove(global.udfsHash, udf->name, TSDB_FUNC_NAME_LEN); + taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); } uv_mutex_unlock(&global.udfsMutex); if (unloadUdf) { @@ -580,7 +580,7 @@ static int32_t udfdRun() { fnInfo("start the udfd"); int code = uv_run(global.loop, UV_RUN_DEFAULT); - fnInfo("udfd stopped. result: %s", uv_err_name(code)); + fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code); int codeClose = uv_loop_close(global.loop); fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); udfdCloseClientRpc();