From 9c12e2bf6e3111632c5f79be9b239a5d53ef2e7c Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 25 Mar 2022 10:50:25 +0800 Subject: [PATCH] start/stop process --- source/libs/function/inc/tudfInt.h | 3 + source/libs/function/src/tudf.c | 123 ++++++++++++++--------------- source/libs/function/src/udfd.c | 41 +++++----- 3 files changed, 80 insertions(+), 87 deletions(-) diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 048522968e..5f757c1ef0 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -20,6 +20,9 @@ extern "C" { #endif +//TODO replaces them with fnDebug +//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__) +#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");} enum { UDF_TASK_SETUP = 0, UDF_TASK_CALL = 1, diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index b4e1c72483..6fcdb34529 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -14,17 +14,10 @@ */ #include "uv.h" #include "os.h" +#include "tlog.h" #include "tudf.h" #include "tudfInt.h" -//TODO replace them with qDebug -#define DEBUG -#ifdef DEBUG -#define debugPrint(...) fprintf(__VA_ARGS__) -#else -#define debugPrint(...) /**/ -#endif - //TODO: when startup, set thread poll size. add it to cfg //TODO: udfd restart when exist or aborts //TODO: network error processing. @@ -179,21 +172,21 @@ void udfTaskQueueMove(SClientUvTaskQueue q, SClientUvTaskQueue n) { int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { - debugPrint(stdout, "%s\n", "encoding request"); + debugPrint("%s", "encoding request"); int len = sizeof(SUdfRequest) - sizeof(void *); switch (request->type) { - case 0: { + case UDF_TASK_SETUP: { SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq); len += sizeof(SUdfSetupRequest) - 1 * sizeof(char *) + setup->pathSize; break; } - case 1: { + case UDF_TASK_CALL: { SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq); len += sizeof(SUdfCallRequest) - 2 * sizeof(char *) + call->inputBytes + call->stateBytes; break; } - case 2: { + case UDF_TASK_TEARDOWN: { SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq); len += sizeof(SUdfTeardownRequest); break; @@ -214,7 +207,7 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { buf += sizeof(int8_t); switch (request->type) { - case 0: { + case UDF_TASK_SETUP: { SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq); memcpy(buf, setup->udfName, 16); buf += 16; @@ -229,7 +222,7 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { break; } - case 1: { + case UDF_TASK_CALL: { SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq); *(int64_t *) buf = call->udfHandle; buf += sizeof(int64_t); @@ -246,7 +239,7 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { break; } - case 2: { + case UDF_TASK_TEARDOWN: { SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq); *(int64_t *) buf = teardown->udfHandle; buf += sizeof(int64_t); @@ -264,9 +257,9 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) { } int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { - debugPrint(stdout, "%s\n", "decoding request"); + debugPrint("%s", "decoding request"); if (*(int32_t *) bufMsg != bufLen) { - debugPrint(stderr, "%s\n", "dequest request error"); + debugPrint("%s", "decoding request error"); return -1; } char *buf = bufMsg; @@ -280,7 +273,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { buf += sizeof(int8_t); switch (request->type) { - case 0: { + case UDF_TASK_SETUP: { SUdfSetupRequest *setup = malloc(sizeof(SUdfSetupRequest)); memcpy(setup->udfName, buf, 16); @@ -297,7 +290,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { request->subReq = setup; break; } - case 1: { + case UDF_TASK_CALL: { SUdfCallRequest *call = malloc(sizeof(SUdfCallRequest)); call->udfHandle = *(int64_t *) buf; @@ -317,7 +310,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { break; } - case 2: { + case UDF_TASK_TEARDOWN: { SUdfTeardownRequest *teardown = malloc(sizeof(SUdfTeardownRequest)); teardown->udfHandle = *(int64_t *) buf; @@ -328,7 +321,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { } if (buf - bufMsg != bufLen) { - debugPrint(stderr, "%s\n", "decode request error"); + debugPrint("%s", "decode request error"); free(request->subReq); free(request); return -1; @@ -338,22 +331,22 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) { } int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { - debugPrint(stdout, "%s\n", "encoding response"); + debugPrint("%s", "encoding response"); int32_t len = sizeof(SUdfResponse) - sizeof(void *); switch (response->type) { - case 0: { + case UDF_TASK_SETUP: { len += sizeof(SUdfSetupResponse); break; } - case 1: { + case UDF_TASK_CALL: { SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp); len += sizeof(SUdfCallResponse) - 2 * sizeof(char *) + callResp->outputBytes + callResp->newStateBytes; break; } - case 2: { + case UDF_TASK_TEARDOWN: { len += sizeof(SUdfTeardownResponse); break; } @@ -374,13 +367,13 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { switch (response->type) { - case 0: { + case UDF_TASK_SETUP: { SUdfSetupResponse *setupResp = (SUdfSetupResponse *) (response->subRsp); *(int64_t *) buf = setupResp->udfHandle; buf += sizeof(int64_t); break; } - case 1: { + case UDF_TASK_CALL: { SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp); *(int32_t *) buf = callResp->outputBytes; buf += sizeof(int32_t); @@ -393,7 +386,7 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { buf += callResp->newStateBytes; break; } - case 2: { + case UDF_TASK_TEARDOWN: { SUdfTeardownResponse *teardownResp = (SUdfTeardownResponse *) (response->subRsp); break; } @@ -408,10 +401,10 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) { } int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { - debugPrint(stdout, "%s\n", "decoding response"); + debugPrint("%s", "decoding response"); if (*(int32_t *) bufMsg != bufLen) { - debugPrint(stderr, "%s\n", "can not decode response"); + debugPrint("%s", "can not decode response"); return -1; } char *buf = bufMsg; @@ -426,14 +419,14 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { buf += sizeof(int32_t); switch (rsp->type) { - case 0: { + case UDF_TASK_SETUP: { SUdfSetupResponse *setupRsp = (SUdfSetupResponse *) malloc(sizeof(SUdfSetupResponse)); setupRsp->udfHandle = *(int64_t *) buf; buf += sizeof(int64_t); rsp->subRsp = (char *) setupRsp; break; } - case 1: { + case UDF_TASK_CALL: { SUdfCallResponse *callRsp = (SUdfCallResponse *) malloc(sizeof(SUdfCallResponse)); callRsp->outputBytes = *(int32_t *) buf; buf += sizeof(int32_t); @@ -450,7 +443,7 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { rsp->subRsp = callRsp; break; } - case 2: { + case UDF_TASK_TEARDOWN: { SUdfTeardownResponse *teardownRsp = (SUdfTeardownResponse *) malloc(sizeof(SUdfTeardownResponse)); rsp->subRsp = teardownRsp; break; @@ -459,7 +452,7 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { break; } if (buf - bufMsg != bufLen) { - debugPrint(stderr, "%s\n", "can not decode response"); + debugPrint("%s", "can not decode response"); free(rsp->subRsp); free(rsp); return -1; @@ -469,8 +462,9 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) { } void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { - debugPrint(stderr, "Process exited with status %" PRId64 ", signal %d\n", exit_status, term_signal); + debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); uv_close((uv_handle_t *) req, NULL); + //TODO: restart the udfd process } void onUdfcPipeClose(uv_handle_t *handle) { @@ -488,7 +482,7 @@ void onUdfcPipeClose(uv_handle_t *handle) { } int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) { - debugPrint(stdout, "%s\n", "get uv task result"); + debugPrint("%s", "get uv task result"); if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->rspBuf.base != NULL) { SUdfResponse *rsp; @@ -497,13 +491,13 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT switch (task->type) { case UDF_TASK_SETUP: { - //TODO: copy + //TODO: copy or not task->_setup.rsp = *(SUdfSetupResponse *) (rsp->subRsp); break; } case UDF_TASK_CALL: { task->_call.rsp = *(SUdfCallResponse *) (rsp->subRsp); - //TODO: copy + //TODO: copy or not break; } case UDF_TASK_TEARDOWN: { @@ -532,7 +526,7 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT } void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { - debugPrint(stdout, "%s\n", "client allocate buffer to receive from pipe"); + debugPrint("%s", "client allocate buffer to receive from pipe"); SClientUvConn *conn = handle->data; SClientConnBuf *connBuf = &conn->readBuf; @@ -565,7 +559,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf } } - debugPrint(stdout, "\tconn buf cap - len - total : %d - %d - %d\n", connBuf->cap, connBuf->len, connBuf->total); + debugPrint("\tconn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total); } @@ -621,7 +615,7 @@ void udfcUvHandleError(SClientUvConn *conn) { } void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { - debugPrint(stdout, "%s, nread: %zd\n", "client read from pipe", nread); + debugPrint("%s, nread: %zd", "client read from pipe", nread); if (nread == 0) return; SClientUvConn *conn = client->data; @@ -634,7 +628,7 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } if (nread < 0) { - debugPrint(stderr, "\tclient read error: %s\n", uv_strerror(nread)); + debugPrint("\tclient read error: %s", uv_strerror(nread)); if (nread == UV_EOF) { //TODO: } @@ -644,7 +638,7 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } void onUdfClientWrite(uv_write_t *write, int status) { - debugPrint(stderr, "%s\n", "after writing to pipe"); + debugPrint("%s", "after writing to pipe"); SClientUvTaskNode *uvTask = write->data; if (status == 0) { uv_pipe_t *pipe = uvTask->pipe; @@ -653,7 +647,7 @@ void onUdfClientWrite(uv_write_t *write, int status) { } else { //TODO Log error; } - debugPrint(stdout, "\tlength:%zu\n", uvTask->reqBuf.len); + debugPrint("\tlength:%zu", uvTask->reqBuf.len); free(write); free(uvTask->reqBuf.base); } @@ -707,7 +701,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN } int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { - debugPrint(stdout, "%s, %d\n", "queue uv task", uvTask->type); + debugPrint("%s, %d", "queue uv task", uvTask->type); uv_mutex_lock(&gUdfTaskQueueMutex); udfTaskQueueInsertTail(gUdfTaskQueue, uvTask); @@ -721,7 +715,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { } int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { - debugPrint(stdout, "%s, type %d\n", "start uv task ", uvTask->type); + debugPrint("%s, type %d", "start uv task ", uvTask->type); switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = malloc(sizeof(uv_pipe_t)); @@ -790,20 +784,21 @@ void udfStopAsyncCb(uv_async_t *async) { void startUdfd(void *argsThread) { uv_loop_init(&gUdfdLoop); -// uv_process_options_t options; -// static char path[256] = {0}; -// size_t cwdSize; -// uv_cwd(path, &cwdSize); -// strcat(path, "/udfd"); -// char* args[2] = {path, NULL}; -// options.args = args; -// options.file = path; -// options.exit_cb = onUdfdExit; -// -// int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options); -// if (err != 0) { -// debugPrint(stderr, "can not spawn udfd. error: %s", uv_strerror(err)); -// } + //TODO: path + uv_process_options_t options; + static char path[256] = {0}; + size_t cwdSize; + uv_cwd(path, &cwdSize); + strcat(path, "./udfd"); + char* args[2] = {path, NULL}; + options.args = args; + options.file = path; + options.exit_cb = onUdfdExit; + + int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options); + if (err != 0) { + debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); + } uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); @@ -822,7 +817,7 @@ int32_t startUdfService() { int32_t stopUdfService() { uv_barrier_destroy(&gUdfInitBarrier); - //uv_process_kill(&gUdfdProcess, SIGINT); + uv_process_kill(&gUdfdProcess, SIGINT); uv_async_send(&gUdfLoopStopAsync); uv_mutex_destroy(&gUdfTaskQueueMutex); uv_thread_join(&gUdfLoopThread); @@ -844,7 +839,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { } int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { - debugPrint(stdout, "%s\n", "client setup udf"); + debugPrint("%s", "client setup udf"); SClientUdfTask *task = malloc(sizeof(SClientUdfTask)); task->errCode = 0; task->session = malloc(sizeof(SUdfUvSession)); @@ -875,7 +870,7 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState, int32_t *newStateSize, SUdfDataBlock *output) { - debugPrint(stdout, "%s\n", "client call udf"); + debugPrint("%s", "client call udf"); SClientUdfTask *task = malloc(sizeof(SClientUdfTask)); task->errCode = 0; @@ -904,7 +899,7 @@ int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, S } int32_t teardownUdf(UdfHandle handle) { - debugPrint(stdout, "%s\n", "client teardown udf"); + debugPrint("%s", "client teardown udf"); SClientUdfTask *task = malloc(sizeof(SClientUdfTask)); task->errCode = 0; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 18c6004b25..27385325f5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -15,16 +15,11 @@ #include "uv.h" #include "os.h" +#include "tlog.h" + #include "tudf.h" #include "tudfInt.h" -//TODO replaces them with qDebug -#define DEBUG -#ifdef DEBUG -#define debugPrint(...) fprintf(__VA_ARGS__) -#else -#define debugPrint(...) /**/ -#endif static uv_loop_t *loop; @@ -66,14 +61,14 @@ void udfdProcessRequest(uv_work_t *req) { switch (request->type) { case UDF_TASK_SETUP: { - debugPrint(stdout, "%s\n", "process setup request"); + debugPrint("%s", "process setup request"); SUdf *udf = malloc(sizeof(SUdf)); udf->refCount = 0; SUdfSetupRequest *setup = request->subReq; strcpy(udf->name, setup->udfName); int err = uv_dlopen(setup->path, &udf->lib); if (err != 0) { - debugPrint(stderr, "can not load library %s. error: %s", setup->path, uv_strerror(err)); + debugPrint("can not load library %s. error: %s", setup->path, uv_strerror(err)); //TODO set error } @@ -109,7 +104,7 @@ void udfdProcessRequest(uv_work_t *req) { } case UDF_TASK_CALL: { - debugPrint(stdout, "%s\n", "process call request"); + debugPrint("%s", "process call request"); SUdfCallRequest *call = request->subReq; SUdfHandle *handle = (SUdfHandle *) (call->udfHandle); SUdf *udf = handle->udf; @@ -146,7 +141,7 @@ void udfdProcessRequest(uv_work_t *req) { break; } case UDF_TASK_TEARDOWN: { - debugPrint(stdout, "%s\n", "process teardown request"); + debugPrint("%s", "process teardown request"); SUdfTeardownRequest *teardown = request->subReq; SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle); @@ -186,12 +181,12 @@ void udfdProcessRequest(uv_work_t *req) { } void udfdOnWrite(uv_write_t *req, int status) { - debugPrint(stdout, "%s\n", "after writing to pipe"); + debugPrint("%s", "after writing to pipe"); if (status < 0) { - debugPrint(stderr, "Write error %s\n", uv_err_name(status)); + debugPrint("Write error %s", uv_err_name(status)); } SUvUdfWork *work = (SUvUdfWork *) req->data; - debugPrint(stdout, "\tlength: %zu\n", work->output.len); + debugPrint("\tlength: %zu", work->output.len); free(work->output.base); free(work); free(req); @@ -199,7 +194,7 @@ void udfdOnWrite(uv_write_t *req, int status) { void udfdSendResponse(uv_work_t *work, int status) { - debugPrint(stdout, "%s\n", "send response"); + debugPrint("%s", "send response"); SUvUdfWork *udfWork = (SUvUdfWork *) (work->data); uv_write_t *write_req = malloc(sizeof(uv_write_t)); @@ -210,7 +205,7 @@ void udfdSendResponse(uv_work_t *work, int status) { } void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { - debugPrint(stdout, "%s\n", "allocate buffer for read"); + debugPrint("%s", "allocate buffer for read"); SUdfdUvConn *ctx = handle->data; int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); if (ctx->inputCap == 0) { @@ -240,7 +235,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->len = 0; } } - debugPrint(stdout, "\tinput buf cap - len - total : %d - %d - %d\n", ctx->inputCap, ctx->inputLen, ctx->inputTotal); + debugPrint("\tinput buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal); } @@ -279,7 +274,7 @@ void udfdUvHandleError(SUdfdUvConn *conn) { } void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { - debugPrint(stdout, "%s, nread: %zd\n", "read from pipe", nread); + debugPrint("%s, nread: %zd", "read from pipe", nread); if (nread == 0) return; @@ -296,7 +291,7 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } if (nread < 0) { - debugPrint(stderr, "Read error %s\n", uv_err_name(nread)); + debugPrint("Read error %s", uv_err_name(nread)); if (nread == UV_EOF) { //TODO check more when close } else { @@ -306,7 +301,7 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } void udfdOnNewConnection(uv_stream_t *server, int status) { - debugPrint(stdout, "%s\n", "on new connection"); + debugPrint("%s", "on new connection"); if (status < 0) { // TODO return; @@ -335,7 +330,7 @@ void removeListeningPipe(int sig) { } int main() { - debugPrint(stderr, "libuv version: %x\n", UV_VERSION_HEX); + debugPrint("libuv version: %x", UV_VERSION_HEX); loop = uv_default_loop(); uv_fs_t req; @@ -348,12 +343,12 @@ int main() { int r; if ((r = uv_pipe_bind(&server, "udf.sock"))) { - debugPrint(stderr, "Bind error %s\n", uv_err_name(r)); + debugPrint("Bind error %s\n", uv_err_name(r)); removeListeningPipe(0); return 1; } if ((r = uv_listen((uv_stream_t *) &server, 128, udfdOnNewConnection))) { - debugPrint(stderr, "Listen error %s\n", uv_err_name(r)); + debugPrint("Listen error %s", uv_err_name(r)); return 2; } uv_run(loop, UV_RUN_DEFAULT);