diff --git a/include/os/osEnv.h b/include/os/osEnv.h index 14d50858b7..a3f92a0b29 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -34,6 +34,7 @@ extern int64_t tsOpenMax; extern int64_t tsStreamMax; extern float tsNumOfCores; extern int64_t tsTotalMemoryKB; +extern char* tsProcPath; extern char configDir[]; extern char tsDataDir[]; diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index dd221f8404..376f589acd 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -216,6 +216,126 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) { dmStopStatusThread(pWrapper->pDnode); } +static int32_t dmSpawnUdfd(SDnode *pDnode); + +void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { + dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); + uv_close((uv_handle_t*)process, NULL); + SDnode *pDnode = process->data; + SUdfdData *pData = &pDnode->udfdData; + if (atomic_load_8(&pData->stopping) != 0) { + dDebug("udfd process exit due to stopping"); + } else { + uv_close((uv_handle_t*)&pData->ctrlPipe, NULL); + dmSpawnUdfd(pDnode); + } +} + +static int32_t dmSpawnUdfd(SDnode *pDnode) { + dInfo("dnode start spawning udfd"); + uv_process_options_t options = {0}; + + char path[PATH_MAX] = {0}; + if (tsProcPath == NULL) { + path[0] = '.'; + } else { + strncpy(path, tsProcPath, strlen(tsProcPath)); + taosDirName(path); + } + strcat(path, "/udfd"); + char* argsUdfd[] = {path, "-c", configDir, NULL}; + options.args = argsUdfd; + options.file = path; + + options.exit_cb = dmUdfdExit; + SUdfdData *pData = &pDnode->udfdData; + uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); + + uv_stdio_container_t child_stdio[3]; + child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe; + child_stdio[1].flags = UV_IGNORE; + child_stdio[2].flags = UV_INHERIT_FD; + child_stdio[2].data.fd = 2; + options.stdio_count = 3; + options.stdio = child_stdio; + + char dnodeIdEnvItem[32] = {0}; + char thrdPoolSizeEnvItem[32] = {0}; + snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); + float numCpuCores = 4; + taosGetCpuCores(&numCpuCores); + snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); + char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; + options.env = envUdfd; + + int err = uv_spawn(&pData->loop, &pData->process, &options); + pData->process.data = (void*)pDnode; + + if (err != 0) { + dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); + } + return err; +} + +static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} + +void dmWatchUdfd(void *args) { + SDnode *pDnode = args; + SUdfdData *pData = &pDnode->udfdData; + uv_loop_init(&pData->loop); + int32_t err = dmSpawnUdfd(pDnode); + atomic_store_32(&pData->spawnErr, err); + uv_barrier_wait(&pData->barrier); + uv_run(&pData->loop, UV_RUN_DEFAULT); + err = uv_loop_close(&pData->loop); + while (err == UV_EBUSY) { + uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL); + uv_run(&pData->loop, UV_RUN_DEFAULT); + err = uv_loop_close(&pData->loop); + } + return; +} + +int32_t dmStartUdfd(SDnode *pDnode) { + SUdfdData *pData = &pDnode->udfdData; + if (pData->startCalled) { + dInfo("dnode-mgmt start udfd already called"); + return 0; + } + uv_barrier_init(&pData->barrier, 2); + pData->stopping = 0; + uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); + uv_barrier_wait(&pData->barrier); + pData->startCalled = true; + pData->needCleanUp = true; + return pData->spawnErr; +} + +int32_t dmStopUdfd(SDnode *pDnode) { + dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", + pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr); + SUdfdData *pData = &pDnode->udfdData; + if (!pData->needCleanUp) { + return 0; + } + atomic_store_8(&pData->stopping, 1); + + uv_barrier_destroy(&pData->barrier); + if (pData->spawnErr == 0) { + uv_process_kill(&pData->process, SIGINT); + } + uv_stop(&pData->loop); + uv_thread_join(&pData->thread); + + atomic_store_8(&pData->stopping, 0); + return 0; +} + static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to init"); SDnode *pDnode = pWrapper->pDnode; @@ -247,6 +367,10 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "dnode-transport", "initialized"); + if (dmStartUdfd(pDnode) != 0) { + dError("failed to start udfd"); + } + dInfo("dnode-mgmt is initialized"); return 0; } @@ -254,6 +378,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to clean up"); SDnode *pDnode = pWrapper->pDnode; + dmStopUdfd(pDnode); dmStopWorker(pDnode); taosWLockLatch(&pDnode->data.latch); diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index e6537dcf73..4f4a2ed349 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -16,6 +16,7 @@ #ifndef _TD_DM_DEF_H_ #define _TD_DM_DEF_H_ +#include "uv.h" #include "dmLog.h" #include "cJSON.h" @@ -142,6 +143,18 @@ typedef struct { char desc[TSDB_STEP_DESC_LEN]; } SStartupInfo; +typedef struct SUdfdData { + bool startCalled; + bool needCleanUp; + uv_loop_t loop; + uv_thread_t thread; + uv_barrier_t barrier; + uv_process_t process; + int spawnErr; + int8_t stopping; + uv_pipe_t ctrlPipe; +} SUdfdData; + typedef struct SDnode { EDndProcType ptype; EDndNodeType ntype; @@ -150,6 +163,7 @@ typedef struct SDnode { SStartupInfo startup; SDnodeTrans trans; SDnodeData data; + SUdfdData udfdData; TdThreadMutex mutex; SMgmtWrapper wrappers[NODE_END]; } SDnode; diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index c51b6e1264..b5c839c811 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -27,41 +27,31 @@ extern "C" { #endif #define UDF_LISTEN_PIPE_NAME_LEN 32 -#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock." +#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock." //====================================================================================== //begin API to taosd and qworker enum { UDFC_CODE_STOPPING = -1, - UDFC_CODE_RESTARTING = -2, UDFC_CODE_PIPE_READ_ERR = -3, }; -/*TODO: no api for dnode startudfd/stopudfd*/ -/** - * start udfd dameon service - */ -int32_t startUdfd(int32_t dnodeId); - -/** - * stop udfd dameon service - */ -int32_t stopUdfd(int32_t dnodeId); +typedef void *UdfcHandle; +typedef void *UdfcFuncHandle; /** * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * @return error code */ -int32_t createUdfdProxy(int32_t dnodeId); +int32_t udfcOpen(int32_t dnodeId, UdfcHandle* proxyHandle); /** * destroy udfd proxy * @return error code */ -int32_t destroyUdfdProxy(int32_t dnodeId); +int32_t udfcClose(UdfcHandle proxyhandle); -typedef void *UdfHandle; /** * setup udf @@ -69,7 +59,7 @@ typedef void *UdfHandle; * @param handle, out * @return error code */ -int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle); +int32_t setupUdf(UdfcHandle proxyHandle, char udfName[], SEpSet *epSet, UdfcFuncHandle *handle); typedef struct SUdfColumnMeta { int16_t type; @@ -116,26 +106,26 @@ typedef struct SUdfInterBuf { } SUdfInterBuf; // output: interBuf -int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf); +int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); // input: block, state // output: newState -int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); +int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); +int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); +int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData); +int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData); /** * tearn down udf * @param handle * @return */ -int32_t teardownUdf(UdfHandle handle); +int32_t teardownUdf(UdfcFuncHandle handle); // end API to taosd and qworker //============================================================================================================================= diff --git a/source/libs/function/inc/udfc.h b/source/libs/function/inc/udfc.h index fed2818ced..1cf8c67686 100644 --- a/source/libs/function/inc/udfc.h +++ b/source/libs/function/inc/udfc.h @@ -30,20 +30,20 @@ typedef struct SUdfInfo { char *path; } SUdfInfo; -typedef void *UdfHandle; +typedef void *UdfcFuncHandle; int32_t createUdfdProxy(); int32_t destroyUdfdProxy(); -//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles); +//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles); -int32_t setupUdf(SUdfInfo* udf, UdfHandle* handle); +int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle); -int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, +int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, int32_t *newStateSize, SSDataBlock *output); -int32_t teardownUdf(UdfHandle handle); +int32_t teardownUdf(UdfcFuncHandle handle); typedef struct SUdfSetupRequest { char udfName[16]; // diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index ad74daddc6..e31a860e85 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -14,19 +14,15 @@ */ #include "uv.h" #include "os.h" -#include "tlog.h" #include "tudf.h" #include "tudfInt.h" #include "tarray.h" #include "tdatablock.h" -//TODO: when startup, set thread poll size. add it to cfg -//TODO: test for udfd restart -//TODO: udfd restart when exist or aborts -//TODO: deal with uv task that has been started and then udfd core dumped //TODO: network error processing. //TODO: add unit test //TODO: include all global variable under context struct + /* Copyright (c) 2013, Ben Noordhuis * The QUEUE is copied from queue.h under libuv * */ @@ -125,12 +121,35 @@ enum { UV_TASK_DISCONNECT = 2 }; +int64_t gUdfTaskSeqNum = 0; +typedef struct SUdfdProxy { + int32_t dnodeId; + uv_barrier_t gUdfInitBarrier; + + uv_loop_t gUdfdLoop; + uv_thread_t gUdfLoopThread; + uv_async_t gUdfLoopTaskAync; + + uv_async_t gUdfLoopStopAsync; + + uv_mutex_t gUdfTaskQueueMutex; + int8_t gUdfcState; + QUEUE gUdfTaskQueue; + QUEUE gUvProcTaskQueue; + // int8_t gUdfcState = UDFC_STATE_INITAL; + // QUEUE gUdfTaskQueue = {0}; + // QUEUE gUvProcTaskQueue = {0}; +} SUdfdProxy; + + typedef struct SUdfUvSession { + SUdfdProxy *udfc; int64_t severHandle; uv_pipe_t *udfSvcPipe; } SUdfUvSession; typedef struct SClientUvTaskNode { + SUdfdProxy *udfc; int8_t type; int errCode; @@ -169,7 +188,6 @@ typedef struct SClientUdfTask { } _teardown; }; - } SClientUdfTask; typedef struct SClientConnBuf { @@ -185,34 +203,13 @@ typedef struct SClientUvConn { SClientConnBuf readBuf; } SClientUvConn; -uv_process_t gUdfdProcess; - -uv_barrier_t gUdfInitBarrier; - -uv_loop_t gUdfdLoop; -uv_thread_t gUdfLoopThread; -uv_async_t gUdfLoopTaskAync; - -uv_async_t gUdfLoopStopAsync; - -uv_mutex_t gUdfTaskQueueMutex; -int64_t gUdfTaskSeqNum = 0; - enum { UDFC_STATE_INITAL = 0, // initial state - UDFC_STATE_STARTNG, // starting after createUdfdProxy + UDFC_STATE_STARTNG, // starting after udfcOpen UDFC_STATE_READY, // started and begin to receive quests - UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart. - UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy + UDFC_STATE_STOPPING, // stopping after udfcClose UDFC_STATUS_FINAL, // stopped }; -int8_t gUdfcState = UDFC_STATE_INITAL; - -//double circular linked list - -QUEUE gUdfTaskQueue = {0}; - -QUEUE gUvProcTaskQueue = {0}; int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t len = 0; @@ -777,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) { int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); uvTask->type = uvTaskType; + uvTask->udfc = task->session->udfc; if (uvTaskType == UV_TASK_CONNECT) { } else if (uvTaskType == UV_TASK_REQ_RSP) { uvTask->pipe = task->session->udfSvcPipe; SUdfRequest request; request.type = task->type; - request.seqNum = gUdfTaskSeqNum++; + request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1); if (task->type == UDF_TASK_SETUP) { request.setup = task->_setup.req; @@ -815,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { debugPrint("%s, %d", "queue uv task", uvTask->type); - - uv_mutex_lock(&gUdfTaskQueueMutex); - QUEUE_INSERT_TAIL(&gUdfTaskQueue, &uvTask->recvTaskQueue); - uv_mutex_unlock(&gUdfTaskQueueMutex); - uv_async_send(&gUdfLoopTaskAync); + SUdfdProxy *udfc = uvTask->udfc; + uv_mutex_lock(&udfc->gUdfTaskQueueMutex); + QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue); + uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); + uv_async_send(&udfc->gUdfLoopTaskAync); uv_sem_wait(&uvTask->taskSem); uv_sem_destroy(&uvTask->taskSem); @@ -832,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(&gUdfdLoop, pipe, 0); + uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0); uvTask->pipe = pipe; SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn)); @@ -873,142 +871,98 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { } void udfClientAsyncCb(uv_async_t *async) { + SUdfdProxy *udfc = async->data; QUEUE wq; - uv_mutex_lock(&gUdfTaskQueueMutex); - QUEUE_MOVE(&gUdfTaskQueue, &wq); - uv_mutex_unlock(&gUdfTaskQueueMutex); + uv_mutex_lock(&udfc->gUdfTaskQueueMutex); + QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); + uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); while (!QUEUE_EMPTY(&wq)) { QUEUE* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); startUvUdfTask(task); - QUEUE_INSERT_TAIL(&gUvProcTaskQueue, &task->procTaskQueue); + QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue); } } -void cleanUpUvTasks() { +void cleanUpUvTasks(SUdfdProxy *udfc) { QUEUE wq; - uv_mutex_lock(&gUdfTaskQueueMutex); - QUEUE_MOVE(&gUdfTaskQueue, &wq); - uv_mutex_unlock(&gUdfTaskQueueMutex); + uv_mutex_lock(&udfc->gUdfTaskQueueMutex); + QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq); + uv_mutex_unlock(&udfc->gUdfTaskQueueMutex); while (!QUEUE_EMPTY(&wq)) { QUEUE* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); - if (gUdfcState == UDFC_STATE_STOPPING) { + if (udfc->gUdfcState == UDFC_STATE_STOPPING) { task->errCode = UDFC_CODE_STOPPING; - } else if (gUdfcState == UDFC_STATE_RESTARTING) { - task->errCode = UDFC_CODE_RESTARTING; } uv_sem_post(&task->taskSem); } // TODO: deal with tasks that are waiting result. - while (!QUEUE_EMPTY(&gUvProcTaskQueue)) { - QUEUE* h = QUEUE_HEAD(&gUvProcTaskQueue); + while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) { + QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); - if (gUdfcState == UDFC_STATE_STOPPING) { + if (udfc->gUdfcState == UDFC_STATE_STOPPING) { task->errCode = UDFC_CODE_STOPPING; - } else if (gUdfcState == UDFC_STATE_RESTARTING) { - task->errCode = UDFC_CODE_RESTARTING; } uv_sem_post(&task->taskSem); } } void udfStopAsyncCb(uv_async_t *async) { - cleanUpUvTasks(); - if (gUdfcState == UDFC_STATE_STOPPING) { - uv_stop(&gUdfdLoop); + SUdfdProxy *udfc = async->data; + cleanUpUvTasks(udfc); + if (udfc->gUdfcState == UDFC_STATE_STOPPING) { + uv_stop(&udfc->gUdfdLoop); } } -int32_t udfcSpawnUdfd(); - -void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { - //TODO: pipe close will be first received - debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); - uv_close((uv_handle_t *) req, NULL); - //TODO: restart the udfd process - if (gUdfcState == UDFC_STATE_STOPPING) { - if (term_signal != SIGINT) { - //TODO: log error - } - } - if (gUdfcState == UDFC_STATE_READY) { - gUdfcState = UDFC_STATE_RESTARTING; - //TODO: asynchronous without blocking. how to do it - //cleanUpUvTasks(); - udfcSpawnUdfd(); - } -} - -int32_t udfcSpawnUdfd() { - //TODO: path - uv_process_options_t options = {0}; - static char path[256] = {0}; - size_t cwdSize; - uv_cwd(path, &cwdSize); - strcat(path, "/udfd"); - char* args[2] = {path, NULL}; - options.args = args; - options.file = path; - options.exit_cb = onUdfdExit; - options.stdio_count = 3; - uv_stdio_container_t child_stdio[3]; - child_stdio[0].flags = UV_IGNORE; - child_stdio[1].flags = UV_INHERIT_FD; - child_stdio[1].data.fd = 1; - child_stdio[2].flags = UV_INHERIT_FD; - child_stdio[2].data.fd = 2; - options.stdio = child_stdio; - //TODO spawn error - int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options); - if (err != 0) { - debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); - } - return err; -} - void constructUdfService(void *argsThread) { - uv_loop_init(&gUdfdLoop); + SUdfdProxy *udfc = (SUdfdProxy*)argsThread; + uv_loop_init(&udfc->gUdfdLoop); - uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); - uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); - uv_mutex_init(&gUdfTaskQueueMutex); - QUEUE_INIT(&gUdfTaskQueue); - QUEUE_INIT(&gUvProcTaskQueue); - uv_barrier_wait(&gUdfInitBarrier); + uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb); + udfc->gUdfLoopTaskAync.data = udfc; + uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb); + udfc->gUdfLoopStopAsync.data = udfc; + uv_mutex_init(&udfc->gUdfTaskQueueMutex); + QUEUE_INIT(&udfc->gUdfTaskQueue); + QUEUE_INIT(&udfc->gUvProcTaskQueue); + uv_barrier_wait(&udfc->gUdfInitBarrier); //TODO return value of uv_run - uv_run(&gUdfdLoop, UV_RUN_DEFAULT); - uv_loop_close(&gUdfdLoop); + uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT); + uv_loop_close(&udfc->gUdfdLoop); } - -int32_t createUdfdProxy(int32_t dnodeId) { - gUdfcState = UDFC_STATE_STARTNG; - uv_barrier_init(&gUdfInitBarrier, 2); - uv_thread_create(&gUdfLoopThread, constructUdfService, 0); - uv_barrier_wait(&gUdfInitBarrier); gUdfcState = UDFC_STATE_READY; +int32_t udfcOpen(int32_t dnodeId, UdfcHandle *udfc) { + SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy)); + proxy->dnodeId = dnodeId; + proxy->gUdfcState = UDFC_STATE_STARTNG; + uv_barrier_init(&proxy->gUdfInitBarrier, 2); + uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); + uv_barrier_wait(&proxy->gUdfInitBarrier); + proxy->gUdfcState = UDFC_STATE_READY; + *udfc = proxy; return 0; } -int32_t destroyUdfdProxy(int32_t dnodeId) { - gUdfcState = UDFC_STATE_STOPPING; - uv_barrier_destroy(&gUdfInitBarrier); -// if (gUdfcState == UDFC_STATE_STOPPING) { -// uv_process_kill(&gUdfdProcess, SIGINT); -// } - uv_async_send(&gUdfLoopStopAsync); - uv_thread_join(&gUdfLoopThread); - uv_mutex_destroy(&gUdfTaskQueueMutex); - gUdfcState = UDFC_STATUS_FINAL; +int32_t udfcClose(UdfcHandle udfcHandle) { + SUdfdProxy *udfc = udfcHandle; + udfc->gUdfcState = UDFC_STATE_STOPPING; + uv_async_send(&udfc->gUdfLoopStopAsync); + uv_thread_join(&udfc->gUdfLoopThread); + uv_mutex_destroy(&udfc->gUdfTaskQueueMutex); + uv_barrier_destroy(&udfc->gUdfInitBarrier); + udfc->gUdfcState = UDFC_STATUS_FINAL; + taosMemoryFree(udfc); return 0; } @@ -1026,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { return task->errCode; } -int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { +int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle *funcHandle) { debugPrint("%s", "client setup udf"); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); task->errCode = 0; task->session = taosMemoryMalloc(sizeof(SUdfUvSession)); + task->session->udfc = udfc; task->type = UDF_TASK_SETUP; SUdfSetupRequest *req = &task->_setup.req; @@ -1046,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { SUdfSetupResponse *rsp = &task->_setup.rsp; task->session->severHandle = rsp->udfHandle; - *handle = task->session; + *funcHandle = task->session; int32_t err = task->errCode; taosMemoryFree(task); return err; } -int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, +int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, SSDataBlock* output, SUdfInterBuf *newState) { debugPrint("%s", "client call udf"); @@ -1121,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter } //TODO: translate these calls to callUdf -int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { +int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int8_t callType = TSDB_UDF_CALL_AGG_INIT; int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf); @@ -1131,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { // input: block, state // output: interbuf, -int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { +int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { int8_t callType = TSDB_UDF_CALL_AGG_PROC; int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); return err; @@ -1139,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { +int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { int8_t callType = TSDB_UDF_CALL_AGG_MERGE; int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); return err; @@ -1147,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf // input: interBuf // output: resultData -int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { +int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int8_t callType = TSDB_UDF_CALL_AGG_PROC; int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); return err; @@ -1155,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu // input: block // output: resultData -int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData) { +int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); return err; } -int32_t teardownUdf(UdfHandle handle) { +int32_t teardownUdf(UdfcFuncHandle handle) { debugPrint("%s", "client teardown udf"); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 71434c695f..d6e7a43666 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -27,7 +27,10 @@ typedef struct SUdfdContext { uv_loop_t *loop; + uv_pipe_t ctrlPipe; + uv_signal_t intrSignal; char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; + uv_pipe_t listeningPipe; void *clientRpc; uv_mutex_t udfsMutex; @@ -76,9 +79,9 @@ typedef struct SUdf { // TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: add private udf structure. -typedef struct SUdfHandle { +typedef struct SUdfcFuncHandle { SUdf *udf; -} SUdfHandle; +} SUdfcFuncHandle; int32_t udfdLoadUdf(char* udfName, SUdf* udf) { strcpy(udf->name, udfName); @@ -143,7 +146,7 @@ void udfdProcessRequest(uv_work_t *req) { } uv_mutex_unlock(&udf->lock); } - SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); + SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); handle->udf = udf; // TODO: allocate private structure and call init function and set it to handle SUdfResponse rsp; @@ -166,7 +169,7 @@ void udfdProcessRequest(uv_work_t *req) { case UDF_TASK_CALL: { SUdfCallRequest *call = &request.call; fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); - SUdfHandle *handle = (SUdfHandle *)(call->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdf *udf = handle->udf; SUdfDataBlock input = {0}; @@ -204,7 +207,7 @@ void udfdProcessRequest(uv_work_t *req) { case UDF_TASK_TEARDOWN: { SUdfTeardownRequest *teardown = &request.teardown; fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) - SUdfHandle *handle = (SUdfHandle *)(teardown->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); SUdf *udf = handle->udf; bool unloadUdf = false; uv_mutex_lock(&global.udfsMutex); @@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { } } -void removeListeningPipe(int sig) { +void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { + fnInfo("udfd signal received: %d\n", signum); uv_fs_t req; - uv_fs_unlink(global.loop, &req, "udf.sock", NULL); - exit(0); + uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + uv_signal_stop(handle); + uv_stop(global.loop); } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } @@ -492,37 +497,67 @@ static int32_t udfdInitLog() { return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0); } +void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + buf->base = taosMemoryMalloc(suggested_size); + buf->len = suggested_size; +} + +void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { + if (nread < 0) { + fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); + uv_close((uv_handle_t*)q, NULL); + uv_stop(global.loop); + return; + } + fnError("udfd ctrl pipe read %zu bytes", nread); + taosMemoryFree(buf->base); +} + +static int32_t removeListeningPipe() { + uv_fs_t req; + int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + uv_fs_req_cleanup(&req); + return err; +} + static int32_t udfdUvInit() { uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (loop) { uv_loop_init(loop); } global.loop = loop; + + uv_pipe_init(global.loop, &global.ctrlPipe, 1); + uv_pipe_open(&global.ctrlPipe, 0); + uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); + char dnodeId[8] = {0}; size_t dnodeIdSize; - uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); + int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); + if (err != 0) { + dnodeId[0] = '1'; + } char listenPipeName[32] = {0}; snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); strcpy(global.listenPipeName, listenPipeName); - uv_fs_t req; - uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + removeListeningPipe(); - uv_pipe_t server; - uv_pipe_init(global.loop, &server, 0); + uv_pipe_init(global.loop, &global.listeningPipe, 0); - signal(SIGINT, removeListeningPipe); + uv_signal_init(global.loop, &global.intrSignal); + uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT); int r; fnInfo("bind to pipe %s", global.listenPipeName); - if ((r = uv_pipe_bind(&server, listenPipeName))) { + if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) { fnError("Bind error %s", uv_err_name(r)); - removeListeningPipe(0); + removeListeningPipe(); return -1; } - if ((r = uv_listen((uv_stream_t *)&server, 128, udfdOnNewConnection))) { + if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) { fnError("Listen error %s", uv_err_name(r)); - removeListeningPipe(0); + removeListeningPipe(); return -2; } return 0; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 41c7f65e0b..fb9c3c678a 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -8,7 +8,8 @@ #include "tdatablock.h" int main(int argc, char *argv[]) { - createUdfdProxy(1); + UdfcHandle udfc; + udfcOpen(1, &udfc); uv_sleep(1000); char path[256] = {0}; size_t cwdSize = 256; @@ -20,9 +21,9 @@ int main(int argc, char *argv[]) { fprintf(stdout, "current working directory:%s\n", path); strcat(path, "/libudf1.so"); - UdfHandle handle; + UdfcFuncHandle handle; SEpSet epSet; - setupUdf("udf1", &epSet, &handle); + setupUdf(udfc, "udf1", &epSet, &handle); SSDataBlock block = {0}; SSDataBlock* pBlock = █ @@ -53,5 +54,5 @@ int main(int argc, char *argv[]) { } teardownUdf(handle); - destroyUdfdProxy(1); + udfcClose(udfc); } diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 22884298ef..e24ac41f20 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -37,6 +37,7 @@ int64_t tsOpenMax = 0; int64_t tsStreamMax = 0; float tsNumOfCores = 0; int64_t tsTotalMemoryKB = 0; +char* tsProcPath = NULL; void osDefaultInit() { taosSeedRand(taosSafeRand()); diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index b6de638ac2..d569582256 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -17,7 +17,7 @@ #define _DEFAULT_SOURCE #include "os.h" -static char *tsProcPath = NULL; +char *tsProcPath = NULL; int32_t taosNewProc(char **args) { int32_t pid = fork(); diff --git a/tests/script/test.sh b/tests/script/test.sh index 14dc43beaf..e4191da0a9 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -131,8 +131,8 @@ if [ -n "$FILE_NAME" ]; then FLAG="-v" fi - echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG - valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG + echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG + valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG else if [[ $MULTIPROCESS -eq 1 ]];then echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME