Merge remote-tracking branch 'origin/3.0' into feature/dnode
This commit is contained in:
commit
87aca17f29
|
@ -336,6 +336,17 @@ int32_t udfcOpen();
|
||||||
*/
|
*/
|
||||||
int32_t udfcClose();
|
int32_t udfcClose();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* start udfd that serves udf function invocation under dnode startDnodeId
|
||||||
|
* @param startDnodeId
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t udfStartUdfd(int32_t startDnodeId);
|
||||||
|
/**
|
||||||
|
* stop udfd
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t udfStopUdfd();
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -217,145 +217,6 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
|
||||||
dmStopStatusThread(pWrapper->pDnode);
|
dmStopStatusThread(pWrapper->pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmSpawnUdfd(SDnode *pDnode);
|
|
||||||
|
|
||||||
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
|
|
||||||
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
|
|
||||||
SDnode *pDnode = process->data;
|
|
||||||
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pDnode->udfdData.stopCalled)) {
|
|
||||||
dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
|
|
||||||
} else {
|
|
||||||
dInfo("udfd process restart");
|
|
||||||
dmSpawnUdfd(pDnode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dmSpawnUdfd(SDnode *pDnode) {
|
|
||||||
dInfo("dnode start spawning udfd");
|
|
||||||
uv_process_options_t options = {0};
|
|
||||||
|
|
||||||
char path[PATH_MAX] = {0};
|
|
||||||
if (tsProcPath == NULL) {
|
|
||||||
path[0] = '.';
|
|
||||||
} else {
|
|
||||||
strncpy(path, tsProcPath, strlen(tsProcPath));
|
|
||||||
taosDirName(path);
|
|
||||||
}
|
|
||||||
#ifdef WINDOWS
|
|
||||||
strcat(path, "udfd.exe");
|
|
||||||
#else
|
|
||||||
strcat(path, "/udfd");
|
|
||||||
#endif
|
|
||||||
char* argsUdfd[] = {path, "-c", configDir, NULL};
|
|
||||||
options.args = argsUdfd;
|
|
||||||
options.file = path;
|
|
||||||
|
|
||||||
options.exit_cb = dmUdfdExit;
|
|
||||||
|
|
||||||
SUdfdData *pData = &pDnode->udfdData;
|
|
||||||
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
|
|
||||||
|
|
||||||
uv_stdio_container_t child_stdio[3];
|
|
||||||
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
|
|
||||||
child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
|
|
||||||
child_stdio[1].flags = UV_IGNORE;
|
|
||||||
child_stdio[2].flags = UV_INHERIT_FD;
|
|
||||||
child_stdio[2].data.fd = 2;
|
|
||||||
options.stdio_count = 3;
|
|
||||||
options.stdio = child_stdio;
|
|
||||||
|
|
||||||
options.flags = UV_PROCESS_DETACHED;
|
|
||||||
|
|
||||||
char dnodeIdEnvItem[32] = {0};
|
|
||||||
char thrdPoolSizeEnvItem[32] = {0};
|
|
||||||
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId);
|
|
||||||
float numCpuCores = 4;
|
|
||||||
taosGetCpuCores(&numCpuCores);
|
|
||||||
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
|
|
||||||
char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
|
|
||||||
options.env = envUdfd;
|
|
||||||
|
|
||||||
int err = uv_spawn(&pData->loop, &pData->process, &options);
|
|
||||||
pData->process.data = (void*)pDnode;
|
|
||||||
|
|
||||||
if (err != 0) {
|
|
||||||
dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
|
|
||||||
}
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
|
|
||||||
if (!uv_is_closing(handle)) {
|
|
||||||
uv_close(handle, NULL);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmUdfdStopAsyncCb(uv_async_t *async) {
|
|
||||||
SDnode *pDnode = async->data;
|
|
||||||
SUdfdData *pData = &pDnode->udfdData;
|
|
||||||
uv_stop(&pData->loop);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmWatchUdfd(void *args) {
|
|
||||||
SDnode *pDnode = args;
|
|
||||||
SUdfdData *pData = &pDnode->udfdData;
|
|
||||||
uv_loop_init(&pData->loop);
|
|
||||||
uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb);
|
|
||||||
pData->stopAsync.data = pDnode;
|
|
||||||
int32_t err = dmSpawnUdfd(pDnode);
|
|
||||||
atomic_store_32(&pData->spawnErr, err);
|
|
||||||
uv_barrier_wait(&pData->barrier);
|
|
||||||
uv_run(&pData->loop, UV_RUN_DEFAULT);
|
|
||||||
uv_loop_close(&pData->loop);
|
|
||||||
|
|
||||||
uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL);
|
|
||||||
uv_run(&pData->loop, UV_RUN_DEFAULT);
|
|
||||||
uv_loop_close(&pData->loop);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dmStartUdfd(SDnode *pDnode) {
|
|
||||||
char dnodeId[8] = {0};
|
|
||||||
snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId);
|
|
||||||
uv_os_setenv("DNODE_ID", dnodeId);
|
|
||||||
SUdfdData *pData = &pDnode->udfdData;
|
|
||||||
if (pData->startCalled) {
|
|
||||||
dInfo("dnode-mgmt start udfd already called");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
pData->startCalled = true;
|
|
||||||
uv_barrier_init(&pData->barrier, 2);
|
|
||||||
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
|
|
||||||
uv_barrier_wait(&pData->barrier);
|
|
||||||
int32_t err = atomic_load_32(&pData->spawnErr);
|
|
||||||
if (err != 0) {
|
|
||||||
uv_barrier_destroy(&pData->barrier);
|
|
||||||
uv_async_send(&pData->stopAsync);
|
|
||||||
uv_thread_join(&pData->thread);
|
|
||||||
pData->needCleanUp = false;
|
|
||||||
dInfo("dnode-mgmt udfd cleaned up after spawn err");
|
|
||||||
} else {
|
|
||||||
pData->needCleanUp = true;
|
|
||||||
}
|
|
||||||
return err;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dmStopUdfd(SDnode *pDnode) {
|
|
||||||
dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
|
|
||||||
pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr);
|
|
||||||
SUdfdData *pData = &pDnode->udfdData;
|
|
||||||
if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
atomic_store_32(&pData->stopCalled, 1);
|
|
||||||
pData->needCleanUp = false;
|
|
||||||
uv_barrier_destroy(&pData->barrier);
|
|
||||||
uv_async_send(&pData->stopAsync);
|
|
||||||
uv_thread_join(&pData->thread);
|
|
||||||
dInfo("dnode-mgmt udfd cleaned up");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
||||||
dInfo("dnode-mgmt start to init");
|
dInfo("dnode-mgmt start to init");
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
|
@ -387,7 +248,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
dmReportStartup(pDnode, "dnode-transport", "initialized");
|
dmReportStartup(pDnode, "dnode-transport", "initialized");
|
||||||
|
|
||||||
if (dmStartUdfd(pDnode) != 0) {
|
if (udfStartUdfd(pDnode->data.dnodeId) != 0) {
|
||||||
dError("failed to start udfd");
|
dError("failed to start udfd");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,7 +259,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
||||||
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
|
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
|
||||||
dInfo("dnode-mgmt start to clean up");
|
dInfo("dnode-mgmt start to clean up");
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
dmStopUdfd(pDnode);
|
|
||||||
|
udfStopUdfd();
|
||||||
|
|
||||||
dmStopWorker(pDnode);
|
dmStopWorker(pDnode);
|
||||||
|
|
||||||
taosWLockLatch(&pDnode->data.latch);
|
taosWLockLatch(&pDnode->data.latch);
|
||||||
|
|
|
@ -156,6 +156,8 @@ typedef struct SUdfdData {
|
||||||
uv_pipe_t ctrlPipe;
|
uv_pipe_t ctrlPipe;
|
||||||
uv_async_t stopAsync;
|
uv_async_t stopAsync;
|
||||||
int32_t stopCalled;
|
int32_t stopCalled;
|
||||||
|
|
||||||
|
int32_t dnodeId;
|
||||||
} SUdfdData;
|
} SUdfdData;
|
||||||
|
|
||||||
typedef struct SDnode {
|
typedef struct SDnode {
|
||||||
|
|
|
@ -23,10 +23,165 @@
|
||||||
#include "builtinsimpl.h"
|
#include "builtinsimpl.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
|
||||||
//TODO: network error processing.
|
|
||||||
//TODO: add unit test
|
//TODO: add unit test
|
||||||
//TODO: include all global variable under context struct
|
//TODO: include all global variable under context struct
|
||||||
|
|
||||||
|
typedef struct SUdfdData {
|
||||||
|
bool startCalled;
|
||||||
|
bool needCleanUp;
|
||||||
|
uv_loop_t loop;
|
||||||
|
uv_thread_t thread;
|
||||||
|
uv_barrier_t barrier;
|
||||||
|
uv_process_t process;
|
||||||
|
int spawnErr;
|
||||||
|
uv_pipe_t ctrlPipe;
|
||||||
|
uv_async_t stopAsync;
|
||||||
|
int32_t stopCalled;
|
||||||
|
|
||||||
|
int32_t dnodeId;
|
||||||
|
} SUdfdData;
|
||||||
|
|
||||||
|
SUdfdData udfdGlobal = {0};
|
||||||
|
|
||||||
|
static int32_t udfSpawnUdfd(SUdfdData *pData);
|
||||||
|
|
||||||
|
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
|
||||||
|
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
|
||||||
|
SUdfdData *pData = process->data;
|
||||||
|
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
|
||||||
|
fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
|
||||||
|
} else {
|
||||||
|
fnInfo("udfd process restart");
|
||||||
|
udfSpawnUdfd(pData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t udfSpawnUdfd(SUdfdData* pData) {
|
||||||
|
fnInfo("dnode start spawning udfd");
|
||||||
|
uv_process_options_t options = {0};
|
||||||
|
|
||||||
|
char path[PATH_MAX] = {0};
|
||||||
|
if (tsProcPath == NULL) {
|
||||||
|
path[0] = '.';
|
||||||
|
} else {
|
||||||
|
strncpy(path, tsProcPath, strlen(tsProcPath));
|
||||||
|
taosDirName(path);
|
||||||
|
}
|
||||||
|
#ifdef WINDOWS
|
||||||
|
strcat(path, "udfd.exe");
|
||||||
|
#else
|
||||||
|
strcat(path, "/udfd");
|
||||||
|
#endif
|
||||||
|
char* argsUdfd[] = {path, "-c", configDir, NULL};
|
||||||
|
options.args = argsUdfd;
|
||||||
|
options.file = path;
|
||||||
|
|
||||||
|
options.exit_cb = udfUdfdExit;
|
||||||
|
|
||||||
|
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
|
||||||
|
|
||||||
|
uv_stdio_container_t child_stdio[3];
|
||||||
|
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
|
||||||
|
child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
|
||||||
|
child_stdio[1].flags = UV_IGNORE;
|
||||||
|
child_stdio[2].flags = UV_INHERIT_FD;
|
||||||
|
child_stdio[2].data.fd = 2;
|
||||||
|
options.stdio_count = 3;
|
||||||
|
options.stdio = child_stdio;
|
||||||
|
|
||||||
|
options.flags = UV_PROCESS_DETACHED;
|
||||||
|
|
||||||
|
char dnodeIdEnvItem[32] = {0};
|
||||||
|
char thrdPoolSizeEnvItem[32] = {0};
|
||||||
|
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
|
||||||
|
float numCpuCores = 4;
|
||||||
|
taosGetCpuCores(&numCpuCores);
|
||||||
|
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
|
||||||
|
char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
|
||||||
|
options.env = envUdfd;
|
||||||
|
|
||||||
|
int err = uv_spawn(&pData->loop, &pData->process, &options);
|
||||||
|
pData->process.data = (void*)pData;
|
||||||
|
|
||||||
|
if (err != 0) {
|
||||||
|
fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
|
||||||
|
}
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
|
if (!uv_is_closing(handle)) {
|
||||||
|
uv_close(handle, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void udfUdfdStopAsyncCb(uv_async_t *async) {
|
||||||
|
SUdfdData *pData = async->data;
|
||||||
|
uv_stop(&pData->loop);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void udfWatchUdfd(void *args) {
|
||||||
|
SUdfdData *pData = args;
|
||||||
|
uv_loop_init(&pData->loop);
|
||||||
|
uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb);
|
||||||
|
pData->stopAsync.data = pData;
|
||||||
|
int32_t err = udfSpawnUdfd(pData);
|
||||||
|
atomic_store_32(&pData->spawnErr, err);
|
||||||
|
uv_barrier_wait(&pData->barrier);
|
||||||
|
uv_run(&pData->loop, UV_RUN_DEFAULT);
|
||||||
|
uv_loop_close(&pData->loop);
|
||||||
|
|
||||||
|
uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
|
||||||
|
uv_run(&pData->loop, UV_RUN_DEFAULT);
|
||||||
|
uv_loop_close(&pData->loop);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfStartUdfd(int32_t startDnodeId) {
|
||||||
|
SUdfdData *pData = &udfdGlobal;
|
||||||
|
if (pData->startCalled) {
|
||||||
|
fnInfo("dnode-mgmt start udfd already called");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
pData->startCalled = true;
|
||||||
|
char dnodeId[8] = {0};
|
||||||
|
snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
|
||||||
|
uv_os_setenv("DNODE_ID", dnodeId);
|
||||||
|
pData->dnodeId = startDnodeId;
|
||||||
|
|
||||||
|
uv_barrier_init(&pData->barrier, 2);
|
||||||
|
uv_thread_create(&pData->thread, udfWatchUdfd, pData);
|
||||||
|
uv_barrier_wait(&pData->barrier);
|
||||||
|
int32_t err = atomic_load_32(&pData->spawnErr);
|
||||||
|
if (err != 0) {
|
||||||
|
uv_barrier_destroy(&pData->barrier);
|
||||||
|
uv_async_send(&pData->stopAsync);
|
||||||
|
uv_thread_join(&pData->thread);
|
||||||
|
pData->needCleanUp = false;
|
||||||
|
fnInfo("dnode-mgmt udfd cleaned up after spawn err");
|
||||||
|
} else {
|
||||||
|
pData->needCleanUp = true;
|
||||||
|
}
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfStopUdfd() {
|
||||||
|
SUdfdData *pData = &udfdGlobal;
|
||||||
|
fnInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
|
||||||
|
pData->needCleanUp, pData->spawnErr);
|
||||||
|
if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
atomic_store_32(&pData->stopCalled, 1);
|
||||||
|
pData->needCleanUp = false;
|
||||||
|
uv_barrier_destroy(&pData->barrier);
|
||||||
|
uv_async_send(&pData->stopAsync);
|
||||||
|
uv_thread_join(&pData->thread);
|
||||||
|
fnInfo("dnode-mgmt udfd cleaned up");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
//==============================================================================================
|
||||||
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
|
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
|
||||||
* The QUEUE is copied from queue.h under libuv
|
* The QUEUE is copied from queue.h under libuv
|
||||||
* */
|
* */
|
||||||
|
|
|
@ -64,35 +64,35 @@ if $data00 != 1.414213562 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
|
#sql insert into t2 values(now+2s, 1, null)(now+3s, null, 2);
|
||||||
sql select udf1(f1, f2) from t2;
|
#sql select udf1(f1, f2) from t2;
|
||||||
print $rows , $data00 , $data10 , $data20 , $data30
|
#print $rows , $data00 , $data10 , $data20 , $data30
|
||||||
if $rows != 4 then
|
#if $rows != 4 then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
if $data00 != 88 then
|
#if $data00 != 88 then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
if $data10 != 88 then
|
#if $data10 != 88 then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
|
#
|
||||||
if $data20 != NULL then
|
#if $data20 != NULL then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
|
#
|
||||||
if $data30 != NULL then
|
#if $data30 != NULL then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
|
#
|
||||||
sql select udf2(f1, f2) from t2;
|
#sql select udf2(f1, f2) from t2;
|
||||||
print $rows, $data00
|
#print $rows, $data00
|
||||||
if $rows != 1 then
|
#if $rows != 1 then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
if $data00 != 2.645751311 then
|
#if $data00 != 2.645751311 then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
sql drop function udf1;
|
sql drop function udf1;
|
||||||
sql show functions;
|
sql show functions;
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
|
|
Loading…
Reference in New Issue