prepare for dnode start/stop/restart udfd and vnode/snode/qnode split
This commit is contained in:
parent
177b1c96a5
commit
ca34398c43
|
@ -38,17 +38,28 @@ enum {
|
||||||
UDFC_CODE_PIPE_READ_ERR = -3,
|
UDFC_CODE_PIPE_READ_ERR = -3,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*TODO: no api for dnode startudfd/stopudfd*/
|
||||||
/**
|
/**
|
||||||
* start udf dameon service
|
* start udfd dameon service
|
||||||
* @return error code
|
|
||||||
*/
|
*/
|
||||||
int32_t startUdfService();
|
int32_t startUdfd(int32_t dnodeId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* stop udf dameon service
|
* stop udfd dameon service
|
||||||
|
*/
|
||||||
|
int32_t stopUdfd(int32_t dnodeId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t stopUdfService();
|
int32_t createUdfdProxy(int32_t dnodeId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* destroy udfd proxy
|
||||||
|
* @return error code
|
||||||
|
*/
|
||||||
|
int32_t destroyUdfdProxy(int32_t dnodeId);
|
||||||
|
|
||||||
typedef void *UdfHandle;
|
typedef void *UdfHandle;
|
||||||
|
|
||||||
|
@ -104,7 +115,6 @@ typedef struct SUdfInterBuf {
|
||||||
char* buf;
|
char* buf;
|
||||||
} SUdfInterBuf;
|
} SUdfInterBuf;
|
||||||
|
|
||||||
//TODO: translate these calls to callUdf
|
|
||||||
// output: interBuf
|
// output: interBuf
|
||||||
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf);
|
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf);
|
||||||
// input: block, state
|
// input: block, state
|
||||||
|
|
|
@ -32,9 +32,9 @@ typedef struct SUdfInfo {
|
||||||
|
|
||||||
typedef void *UdfHandle;
|
typedef void *UdfHandle;
|
||||||
|
|
||||||
int32_t startUdfService();
|
int32_t createUdfdProxy();
|
||||||
|
|
||||||
int32_t stopUdfService();
|
int32_t destroyUdfdProxy();
|
||||||
|
|
||||||
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
|
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
|
||||||
|
|
||||||
|
|
|
@ -200,10 +200,10 @@ int64_t gUdfTaskSeqNum = 0;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
UDFC_STATE_INITAL = 0, // initial state
|
UDFC_STATE_INITAL = 0, // initial state
|
||||||
UDFC_STATE_STARTNG, // starting after startUdfService
|
UDFC_STATE_STARTNG, // starting after createUdfdProxy
|
||||||
UDFC_STATE_READY, // started and begin to receive quests
|
UDFC_STATE_READY, // started and begin to receive quests
|
||||||
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
|
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
|
||||||
UDFC_STATE_STOPPING, // stopping after stopUdfService
|
UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy
|
||||||
UDFC_STATUS_FINAL, // stopped
|
UDFC_STATUS_FINAL, // stopped
|
||||||
};
|
};
|
||||||
int8_t gUdfcState = UDFC_STATE_INITAL;
|
int8_t gUdfcState = UDFC_STATE_INITAL;
|
||||||
|
@ -929,7 +929,7 @@ void udfStopAsyncCb(uv_async_t *async) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startUdfd();
|
int32_t udfcSpawnUdfd();
|
||||||
|
|
||||||
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
|
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
|
||||||
//TODO: pipe close will be first received
|
//TODO: pipe close will be first received
|
||||||
|
@ -944,12 +944,12 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
|
||||||
if (gUdfcState == UDFC_STATE_READY) {
|
if (gUdfcState == UDFC_STATE_READY) {
|
||||||
gUdfcState = UDFC_STATE_RESTARTING;
|
gUdfcState = UDFC_STATE_RESTARTING;
|
||||||
//TODO: asynchronous without blocking. how to do it
|
//TODO: asynchronous without blocking. how to do it
|
||||||
cleanUpUvTasks();
|
//cleanUpUvTasks();
|
||||||
startUdfd();
|
udfcSpawnUdfd();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startUdfd() {
|
int32_t udfcSpawnUdfd() {
|
||||||
//TODO: path
|
//TODO: path
|
||||||
uv_process_options_t options = {0};
|
uv_process_options_t options = {0};
|
||||||
static char path[256] = {0};
|
static char path[256] = {0};
|
||||||
|
@ -979,9 +979,6 @@ int32_t startUdfd() {
|
||||||
void constructUdfService(void *argsThread) {
|
void constructUdfService(void *argsThread) {
|
||||||
uv_loop_init(&gUdfdLoop);
|
uv_loop_init(&gUdfdLoop);
|
||||||
|
|
||||||
//TODO spawn error
|
|
||||||
startUdfd();
|
|
||||||
|
|
||||||
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
|
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
|
||||||
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
|
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
|
||||||
uv_mutex_init(&gUdfTaskQueueMutex);
|
uv_mutex_init(&gUdfTaskQueueMutex);
|
||||||
|
@ -994,7 +991,7 @@ void constructUdfService(void *argsThread) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t startUdfService() {
|
int32_t createUdfdProxy(int32_t dnodeId) {
|
||||||
gUdfcState = UDFC_STATE_STARTNG;
|
gUdfcState = UDFC_STATE_STARTNG;
|
||||||
uv_barrier_init(&gUdfInitBarrier, 2);
|
uv_barrier_init(&gUdfInitBarrier, 2);
|
||||||
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
|
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
|
||||||
|
@ -1002,12 +999,12 @@ int32_t startUdfService() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stopUdfService() {
|
int32_t destroyUdfdProxy(int32_t dnodeId) {
|
||||||
gUdfcState = UDFC_STATE_STOPPING;
|
gUdfcState = UDFC_STATE_STOPPING;
|
||||||
uv_barrier_destroy(&gUdfInitBarrier);
|
uv_barrier_destroy(&gUdfInitBarrier);
|
||||||
if (gUdfcState == UDFC_STATE_STOPPING) {
|
// if (gUdfcState == UDFC_STATE_STOPPING) {
|
||||||
uv_process_kill(&gUdfdProcess, SIGINT);
|
// uv_process_kill(&gUdfdProcess, SIGINT);
|
||||||
}
|
// }
|
||||||
uv_async_send(&gUdfLoopStopAsync);
|
uv_async_send(&gUdfLoopStopAsync);
|
||||||
uv_thread_join(&gUdfLoopThread);
|
uv_thread_join(&gUdfLoopThread);
|
||||||
uv_mutex_destroy(&gUdfTaskQueueMutex);
|
uv_mutex_destroy(&gUdfTaskQueueMutex);
|
||||||
|
|
|
@ -83,18 +83,14 @@ typedef struct SUdfHandle {
|
||||||
int32_t udfdLoadUdf(char* udfName, SUdf* udf) {
|
int32_t udfdLoadUdf(char* udfName, SUdf* udf) {
|
||||||
strcpy(udf->name, udfName);
|
strcpy(udf->name, udfName);
|
||||||
|
|
||||||
// TODO: retrive udf info from mnode
|
int err = uv_dlopen(udf->path, &udf->lib);
|
||||||
char *path = "libudf1.so";
|
|
||||||
int err = uv_dlopen(path, &udf->lib);
|
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
fnError("can not load library %s. error: %s", path, uv_strerror(err));
|
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||||
// TODO set error
|
// TODO set error
|
||||||
}
|
}
|
||||||
|
//TODO: find all the functions
|
||||||
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
strcpy(normalFuncName, udfName);
|
strcpy(normalFuncName, udfName);
|
||||||
// TODO error, multi-thread, same udf, lock it
|
|
||||||
// TODO find all functions normal, init, destroy, normal, merge, finalize
|
|
||||||
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
|
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
|
||||||
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
||||||
char *freeSuffix = "_free";
|
char *freeSuffix = "_free";
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
startUdfService();
|
createUdfdProxy(1);
|
||||||
uv_sleep(1000);
|
uv_sleep(1000);
|
||||||
char path[256] = {0};
|
char path[256] = {0};
|
||||||
size_t cwdSize = 256;
|
size_t cwdSize = 256;
|
||||||
|
@ -53,5 +53,5 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
teardownUdf(handle);
|
teardownUdf(handle);
|
||||||
|
|
||||||
stopUdfService();
|
destroyUdfdProxy(1);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue