From 517e587c35fed98db63bc89efe76818ceb361950 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 7 Apr 2022 17:32:58 +0800 Subject: [PATCH] udfd exit and restart processing --- source/libs/function/inc/tudf.h | 5 +++ source/libs/function/src/tudf.c | 80 ++++++++++++++++++++++++++++----- 2 files changed, 75 insertions(+), 10 deletions(-) diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index 72875239d2..f072866c67 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -22,6 +22,11 @@ extern "C" { //====================================================================================== //begin API to taosd and qworker +enum { + UDFC_CODE_STOPPING = -1, + UDFC_CODE_RESTARTING = -2, +}; + /** * start udf dameon service * @return error code diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index a1030f6c21..5b73662d45 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -22,9 +22,9 @@ //TODO: udfd restart when exist or aborts //TODO: network error processing. //TODO: add unit test -//TODO: add lua support void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); - +int32_t destructUdfService(); +int32_t constructUdfService(); enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, @@ -103,14 +103,22 @@ uv_async_t gUdfLoopStopAsync; uv_mutex_t gUdfTaskQueueMutex; int64_t gUdfTaskSeqNum = 0; +enum { + UDFC_STATE_INITAL = 0, // initial state + UDFC_STATE_STARTNG, // starting after startUdfService + UDFC_STATE_READY, // started and begin to receive quests + UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart. + UDFC_STATE_STOPPING, // stopping after stopUdfService + UDFC_STATUS_FINAL, // stopped +}; +int8_t gUdfcState = UDFC_STATE_INITAL; + //double circular linked list typedef SClientUvTaskNode *SClientUvTaskQueue; SClientUvTaskNode gUdfQueueNode; SClientUvTaskQueue gUdfTaskQueue = &gUdfQueueNode; -//add SClientUvTaskNode task that close conn - - +//TODO: deal with uv task that has been started and then udfd core dumped void udfTaskQueueInit(SClientUvTaskQueue q) { q->next = q; @@ -465,6 +473,18 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); uv_close((uv_handle_t *) req, NULL); //TODO: restart the udfd process + if (gUdfcState == UDFC_STATE_STOPPING) { + if (term_signal != SIGINT) { + //TODO: log error + } + } + if (gUdfcState == UDFC_STATE_READY) { + gUdfcState = UDFC_STATE_RESTARTING; + //TODO: asynchronous without blocking. how to do it + destructUdfService(); + constructUdfService(); + } + } void onUdfcPipeClose(uv_handle_t *handle) { @@ -602,7 +622,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) { udfTaskQueueRemoveTask(taskFound); uv_sem_post(&taskFound->taskSem); } else { - //LOG error + //TODO: LOG error } connBuf->buf = NULL; connBuf->total = -1; @@ -777,10 +797,32 @@ void udfClientAsyncCb(uv_async_t *async) { } void udfStopAsyncCb(uv_async_t *async) { + SClientUvTaskNode node; + SClientUvTaskQueue q = &node; + udfTaskQueueInit(q); + + uv_mutex_lock(&gUdfTaskQueueMutex); + udfTaskQueueMove(gUdfTaskQueue, q); + uv_mutex_unlock(&gUdfTaskQueueMutex); + + while (!udfTaskQueueIsEmpty(q)) { + SClientUvTaskNode *task = udfTaskQueueHeadTask(q); + udfTaskQueueRemoveTask(task); + if (gUdfcState == UDFC_STATE_STOPPING) { + task->errCode = UDFC_CODE_STOPPING; + } else if (gUdfcState == UDFC_STATE_RESTARTING) { + task->errCode = UDFC_CODE_RESTARTING; + } + uv_sem_post(&task->taskSem); + } + + // TODO: deal with tasks that are waiting result. + uv_stop(&gUdfdLoop); - uv_loop_close(&gUdfdLoop); } + + void startUdfd(void *argsThread) { uv_loop_init(&gUdfdLoop); @@ -805,25 +847,43 @@ void startUdfd(void *argsThread) { uv_mutex_init(&gUdfTaskQueueMutex); udfTaskQueueInit(gUdfTaskQueue); uv_barrier_wait(&gUdfInitBarrier); + //TODO return value of uv_run uv_run(&gUdfdLoop, UV_RUN_DEFAULT); + uv_loop_close(&gUdfdLoop); } -int32_t startUdfService() { +int32_t constructUdfService() { uv_barrier_init(&gUdfInitBarrier, 2); uv_thread_create(&gUdfLoopThread, startUdfd, 0); uv_barrier_wait(&gUdfInitBarrier); return 0; } -int32_t stopUdfService() { +int32_t startUdfService() { + gUdfcState = UDFC_STATE_STARTNG; + constructUdfService(); + gUdfcState = UDFC_STATE_READY; + return 0; +} + +int32_t destructUdfService() { uv_barrier_destroy(&gUdfInitBarrier); - uv_process_kill(&gUdfdProcess, SIGINT); + if (gUdfcState == UDFC_STATE_STOPPING) { + uv_process_kill(&gUdfdProcess, SIGINT); + } uv_async_send(&gUdfLoopStopAsync); uv_mutex_destroy(&gUdfTaskQueueMutex); uv_thread_join(&gUdfLoopThread); return 0; } +int32_t stopUdfService() { + gUdfcState = UDFC_STATE_STOPPING; + destructUdfService(); + gUdfcState = UDFC_STATUS_FINAL; + return 0; +} + int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { SClientUvTaskNode *uvTask = NULL;