remove udfd start/stop from tudf.c
This commit is contained in:
parent
adf993cdc3
commit
b4d3008da3
|
@ -27,7 +27,7 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define UDF_LISTEN_PIPE_NAME_LEN 32
|
#define UDF_LISTEN_PIPE_NAME_LEN 32
|
||||||
#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock."
|
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock."
|
||||||
|
|
||||||
//======================================================================================
|
//======================================================================================
|
||||||
//begin API to taosd and qworker
|
//begin API to taosd and qworker
|
||||||
|
@ -38,17 +38,6 @@ enum {
|
||||||
UDFC_CODE_PIPE_READ_ERR = -3,
|
UDFC_CODE_PIPE_READ_ERR = -3,
|
||||||
};
|
};
|
||||||
|
|
||||||
/*TODO: no api for dnode startudfd/stopudfd*/
|
|
||||||
/**
|
|
||||||
* start udfd dameon service
|
|
||||||
*/
|
|
||||||
int32_t startUdfd(int32_t dnodeId);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* stop udfd dameon service
|
|
||||||
*/
|
|
||||||
int32_t stopUdfd(int32_t dnodeId);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
|
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
|
||||||
* @return error code
|
* @return error code
|
||||||
|
|
|
@ -20,13 +20,10 @@
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
||||||
//TODO: when startup, set thread poll size. add it to cfg
|
|
||||||
//TODO: test for udfd restart
|
|
||||||
//TODO: udfd restart when exist or aborts
|
|
||||||
//TODO: deal with uv task that has been started and then udfd core dumped
|
|
||||||
//TODO: network error processing.
|
//TODO: network error processing.
|
||||||
//TODO: add unit test
|
//TODO: add unit test
|
||||||
//TODO: include all global variable under context struct
|
//TODO: include all global variable under context struct
|
||||||
|
|
||||||
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
|
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
|
||||||
* The QUEUE is copied from queue.h under libuv
|
* The QUEUE is copied from queue.h under libuv
|
||||||
* */
|
* */
|
||||||
|
@ -185,8 +182,6 @@ typedef struct SClientUvConn {
|
||||||
SClientConnBuf readBuf;
|
SClientConnBuf readBuf;
|
||||||
} SClientUvConn;
|
} SClientUvConn;
|
||||||
|
|
||||||
uv_process_t gUdfdProcess;
|
|
||||||
|
|
||||||
uv_barrier_t gUdfInitBarrier;
|
uv_barrier_t gUdfInitBarrier;
|
||||||
|
|
||||||
uv_loop_t gUdfdLoop;
|
uv_loop_t gUdfdLoop;
|
||||||
|
@ -202,7 +197,6 @@ enum {
|
||||||
UDFC_STATE_INITAL = 0, // initial state
|
UDFC_STATE_INITAL = 0, // initial state
|
||||||
UDFC_STATE_STARTNG, // starting after createUdfdProxy
|
UDFC_STATE_STARTNG, // starting after createUdfdProxy
|
||||||
UDFC_STATE_READY, // started and begin to receive quests
|
UDFC_STATE_READY, // started and begin to receive quests
|
||||||
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
|
|
||||||
UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy
|
UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy
|
||||||
UDFC_STATUS_FINAL, // stopped
|
UDFC_STATUS_FINAL, // stopped
|
||||||
};
|
};
|
||||||
|
@ -902,8 +896,6 @@ void cleanUpUvTasks() {
|
||||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
|
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
|
||||||
if (gUdfcState == UDFC_STATE_STOPPING) {
|
if (gUdfcState == UDFC_STATE_STOPPING) {
|
||||||
task->errCode = UDFC_CODE_STOPPING;
|
task->errCode = UDFC_CODE_STOPPING;
|
||||||
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
|
|
||||||
task->errCode = UDFC_CODE_RESTARTING;
|
|
||||||
}
|
}
|
||||||
uv_sem_post(&task->taskSem);
|
uv_sem_post(&task->taskSem);
|
||||||
}
|
}
|
||||||
|
@ -915,8 +907,6 @@ void cleanUpUvTasks() {
|
||||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
|
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
|
||||||
if (gUdfcState == UDFC_STATE_STOPPING) {
|
if (gUdfcState == UDFC_STATE_STOPPING) {
|
||||||
task->errCode = UDFC_CODE_STOPPING;
|
task->errCode = UDFC_CODE_STOPPING;
|
||||||
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
|
|
||||||
task->errCode = UDFC_CODE_RESTARTING;
|
|
||||||
}
|
}
|
||||||
uv_sem_post(&task->taskSem);
|
uv_sem_post(&task->taskSem);
|
||||||
}
|
}
|
||||||
|
@ -929,53 +919,6 @@ void udfStopAsyncCb(uv_async_t *async) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfcSpawnUdfd();
|
|
||||||
|
|
||||||
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
|
|
||||||
//TODO: pipe close will be first received
|
|
||||||
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
|
|
||||||
uv_close((uv_handle_t *) req, NULL);
|
|
||||||
//TODO: restart the udfd process
|
|
||||||
if (gUdfcState == UDFC_STATE_STOPPING) {
|
|
||||||
if (term_signal != SIGINT) {
|
|
||||||
//TODO: log error
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (gUdfcState == UDFC_STATE_READY) {
|
|
||||||
gUdfcState = UDFC_STATE_RESTARTING;
|
|
||||||
//TODO: asynchronous without blocking. how to do it
|
|
||||||
//cleanUpUvTasks();
|
|
||||||
udfcSpawnUdfd();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t udfcSpawnUdfd() {
|
|
||||||
//TODO: path
|
|
||||||
uv_process_options_t options = {0};
|
|
||||||
static char path[256] = {0};
|
|
||||||
size_t cwdSize;
|
|
||||||
uv_cwd(path, &cwdSize);
|
|
||||||
strcat(path, "/udfd");
|
|
||||||
char* args[2] = {path, NULL};
|
|
||||||
options.args = args;
|
|
||||||
options.file = path;
|
|
||||||
options.exit_cb = onUdfdExit;
|
|
||||||
options.stdio_count = 3;
|
|
||||||
uv_stdio_container_t child_stdio[3];
|
|
||||||
child_stdio[0].flags = UV_IGNORE;
|
|
||||||
child_stdio[1].flags = UV_INHERIT_FD;
|
|
||||||
child_stdio[1].data.fd = 1;
|
|
||||||
child_stdio[2].flags = UV_INHERIT_FD;
|
|
||||||
child_stdio[2].data.fd = 2;
|
|
||||||
options.stdio = child_stdio;
|
|
||||||
//TODO spawn error
|
|
||||||
int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
|
|
||||||
if (err != 0) {
|
|
||||||
debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
|
|
||||||
}
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
void constructUdfService(void *argsThread) {
|
void constructUdfService(void *argsThread) {
|
||||||
uv_loop_init(&gUdfdLoop);
|
uv_loop_init(&gUdfdLoop);
|
||||||
|
|
||||||
|
@ -990,24 +933,21 @@ void constructUdfService(void *argsThread) {
|
||||||
uv_loop_close(&gUdfdLoop);
|
uv_loop_close(&gUdfdLoop);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t createUdfdProxy(int32_t dnodeId) {
|
int32_t createUdfdProxy(int32_t dnodeId) {
|
||||||
gUdfcState = UDFC_STATE_STARTNG;
|
gUdfcState = UDFC_STATE_STARTNG;
|
||||||
uv_barrier_init(&gUdfInitBarrier, 2);
|
uv_barrier_init(&gUdfInitBarrier, 2);
|
||||||
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
|
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
|
||||||
uv_barrier_wait(&gUdfInitBarrier); gUdfcState = UDFC_STATE_READY;
|
uv_barrier_wait(&gUdfInitBarrier);
|
||||||
|
gUdfcState = UDFC_STATE_READY;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t destroyUdfdProxy(int32_t dnodeId) {
|
int32_t destroyUdfdProxy(int32_t dnodeId) {
|
||||||
gUdfcState = UDFC_STATE_STOPPING;
|
gUdfcState = UDFC_STATE_STOPPING;
|
||||||
uv_barrier_destroy(&gUdfInitBarrier);
|
|
||||||
// if (gUdfcState == UDFC_STATE_STOPPING) {
|
|
||||||
// uv_process_kill(&gUdfdProcess, SIGINT);
|
|
||||||
// }
|
|
||||||
uv_async_send(&gUdfLoopStopAsync);
|
uv_async_send(&gUdfLoopStopAsync);
|
||||||
uv_thread_join(&gUdfLoopThread);
|
uv_thread_join(&gUdfLoopThread);
|
||||||
uv_mutex_destroy(&gUdfTaskQueueMutex);
|
uv_mutex_destroy(&gUdfTaskQueueMutex);
|
||||||
|
uv_barrier_destroy(&gUdfInitBarrier);
|
||||||
gUdfcState = UDFC_STATUS_FINAL;
|
gUdfcState = UDFC_STATUS_FINAL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue