From ca34398c436fb4323750de6a0fb5229ac1526526 Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 20 Apr 2022 19:27:39 +0800 Subject: [PATCH] prepare for dnode start/stop/restart udfd and vnode/snode/qnode split --- source/libs/function/inc/tudf.h | 22 ++++++++++++++++------ source/libs/function/inc/udfc.h | 4 ++-- source/libs/function/src/tudf.c | 25 +++++++++++-------------- source/libs/function/src/udfd.c | 10 +++------- source/libs/function/test/runUdf.c | 4 ++-- 5 files changed, 34 insertions(+), 31 deletions(-) diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index d7bb6dfb46..c51b6e1264 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -38,17 +38,28 @@ enum { UDFC_CODE_PIPE_READ_ERR = -3, }; +/*TODO: no api for dnode startudfd/stopudfd*/ /** - * start udf dameon service - * @return error code + * start udfd dameon service */ -int32_t startUdfService(); +int32_t startUdfd(int32_t dnodeId); /** - * stop udf dameon service + * stop udfd dameon service + */ +int32_t stopUdfd(int32_t dnodeId); + +/** + * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * @return error code */ -int32_t stopUdfService(); +int32_t createUdfdProxy(int32_t dnodeId); + +/** + * destroy udfd proxy + * @return error code + */ +int32_t destroyUdfdProxy(int32_t dnodeId); typedef void *UdfHandle; @@ -104,7 +115,6 @@ typedef struct SUdfInterBuf { char* buf; } SUdfInterBuf; -//TODO: translate these calls to callUdf // output: interBuf int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf); // input: block, state diff --git a/source/libs/function/inc/udfc.h b/source/libs/function/inc/udfc.h index 4d2aeb7049..fed2818ced 100644 --- a/source/libs/function/inc/udfc.h +++ b/source/libs/function/inc/udfc.h @@ -32,9 +32,9 @@ typedef struct SUdfInfo { typedef void *UdfHandle; -int32_t startUdfService(); +int32_t createUdfdProxy(); -int32_t stopUdfService(); +int32_t destroyUdfdProxy(); //int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c41447b584..ad74daddc6 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -200,10 +200,10 @@ int64_t gUdfTaskSeqNum = 0; enum { UDFC_STATE_INITAL = 0, // initial state - UDFC_STATE_STARTNG, // starting after startUdfService + UDFC_STATE_STARTNG, // starting after createUdfdProxy UDFC_STATE_READY, // started and begin to receive quests UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart. - UDFC_STATE_STOPPING, // stopping after stopUdfService + UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy UDFC_STATUS_FINAL, // stopped }; int8_t gUdfcState = UDFC_STATE_INITAL; @@ -929,7 +929,7 @@ void udfStopAsyncCb(uv_async_t *async) { } } -int32_t startUdfd(); +int32_t udfcSpawnUdfd(); void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { //TODO: pipe close will be first received @@ -944,12 +944,12 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { if (gUdfcState == UDFC_STATE_READY) { gUdfcState = UDFC_STATE_RESTARTING; //TODO: asynchronous without blocking. how to do it - cleanUpUvTasks(); - startUdfd(); + //cleanUpUvTasks(); + udfcSpawnUdfd(); } } -int32_t startUdfd() { +int32_t udfcSpawnUdfd() { //TODO: path uv_process_options_t options = {0}; static char path[256] = {0}; @@ -979,9 +979,6 @@ int32_t startUdfd() { void constructUdfService(void *argsThread) { uv_loop_init(&gUdfdLoop); - //TODO spawn error - startUdfd(); - uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); uv_mutex_init(&gUdfTaskQueueMutex); @@ -994,7 +991,7 @@ void constructUdfService(void *argsThread) { } -int32_t startUdfService() { +int32_t createUdfdProxy(int32_t dnodeId) { gUdfcState = UDFC_STATE_STARTNG; uv_barrier_init(&gUdfInitBarrier, 2); uv_thread_create(&gUdfLoopThread, constructUdfService, 0); @@ -1002,12 +999,12 @@ int32_t startUdfService() { return 0; } -int32_t stopUdfService() { +int32_t destroyUdfdProxy(int32_t dnodeId) { gUdfcState = UDFC_STATE_STOPPING; uv_barrier_destroy(&gUdfInitBarrier); - if (gUdfcState == UDFC_STATE_STOPPING) { - uv_process_kill(&gUdfdProcess, SIGINT); - } +// if (gUdfcState == UDFC_STATE_STOPPING) { +// uv_process_kill(&gUdfdProcess, SIGINT); +// } uv_async_send(&gUdfLoopStopAsync); uv_thread_join(&gUdfLoopThread); uv_mutex_destroy(&gUdfTaskQueueMutex); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 641b8535c3..71434c695f 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -83,18 +83,14 @@ typedef struct SUdfHandle { int32_t udfdLoadUdf(char* udfName, SUdf* udf) { strcpy(udf->name, udfName); - // TODO: retrive udf info from mnode - char *path = "libudf1.so"; - int err = uv_dlopen(path, &udf->lib); + int err = uv_dlopen(udf->path, &udf->lib); if (err != 0) { - fnError("can not load library %s. error: %s", path, uv_strerror(err)); + fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); // TODO set error } - + //TODO: find all the functions char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(normalFuncName, udfName); - // TODO error, multi-thread, same udf, lock it - // TODO find all functions normal, init, destroy, normal, merge, finalize uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *freeSuffix = "_free"; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 28dc6bb99a..41c7f65e0b 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -8,7 +8,7 @@ #include "tdatablock.h" int main(int argc, char *argv[]) { - startUdfService(); + createUdfdProxy(1); uv_sleep(1000); char path[256] = {0}; size_t cwdSize = 256; @@ -53,5 +53,5 @@ int main(int argc, char *argv[]) { } teardownUdf(handle); - stopUdfService(); + destroyUdfdProxy(1); }