start/stop process

This commit is contained in:
shenglian zhou 2022-03-25 10:50:25 +08:00
parent abedeb23d6
commit 9c12e2bf6e
3 changed files with 80 additions and 87 deletions

View File

@ -20,6 +20,9 @@
extern "C" {
#endif
//TODO replaces them with fnDebug
//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__)
#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");}
enum {
UDF_TASK_SETUP = 0,
UDF_TASK_CALL = 1,

View File

@ -14,17 +14,10 @@
*/
#include "uv.h"
#include "os.h"
#include "tlog.h"
#include "tudf.h"
#include "tudfInt.h"
//TODO replace them with qDebug
#define DEBUG
#ifdef DEBUG
#define debugPrint(...) fprintf(__VA_ARGS__)
#else
#define debugPrint(...) /**/
#endif
//TODO: when startup, set thread poll size. add it to cfg
//TODO: udfd restart when exist or aborts
//TODO: network error processing.
@ -179,21 +172,21 @@ void udfTaskQueueMove(SClientUvTaskQueue q, SClientUvTaskQueue n) {
int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
debugPrint(stdout, "%s\n", "encoding request");
debugPrint("%s", "encoding request");
int len = sizeof(SUdfRequest) - sizeof(void *);
switch (request->type) {
case 0: {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
len += sizeof(SUdfSetupRequest) - 1 * sizeof(char *) + setup->pathSize;
break;
}
case 1: {
case UDF_TASK_CALL: {
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
len += sizeof(SUdfCallRequest) - 2 * sizeof(char *) + call->inputBytes + call->stateBytes;
break;
}
case 2: {
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
len += sizeof(SUdfTeardownRequest);
break;
@ -214,7 +207,7 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
buf += sizeof(int8_t);
switch (request->type) {
case 0: {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = (SUdfSetupRequest *) (request->subReq);
memcpy(buf, setup->udfName, 16);
buf += 16;
@ -229,7 +222,7 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
break;
}
case 1: {
case UDF_TASK_CALL: {
SUdfCallRequest *call = (SUdfCallRequest *) (request->subReq);
*(int64_t *) buf = call->udfHandle;
buf += sizeof(int64_t);
@ -246,7 +239,7 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
break;
}
case 2: {
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = (SUdfTeardownRequest *) (request->subReq);
*(int64_t *) buf = teardown->udfHandle;
buf += sizeof(int64_t);
@ -264,9 +257,9 @@ int32_t encodeRequest(char **pBuf, int32_t *pBufLen, SUdfRequest *request) {
}
int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
debugPrint(stdout, "%s\n", "decoding request");
debugPrint("%s", "decoding request");
if (*(int32_t *) bufMsg != bufLen) {
debugPrint(stderr, "%s\n", "dequest request error");
debugPrint("%s", "decoding request error");
return -1;
}
char *buf = bufMsg;
@ -280,7 +273,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
buf += sizeof(int8_t);
switch (request->type) {
case 0: {
case UDF_TASK_SETUP: {
SUdfSetupRequest *setup = malloc(sizeof(SUdfSetupRequest));
memcpy(setup->udfName, buf, 16);
@ -297,7 +290,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
request->subReq = setup;
break;
}
case 1: {
case UDF_TASK_CALL: {
SUdfCallRequest *call = malloc(sizeof(SUdfCallRequest));
call->udfHandle = *(int64_t *) buf;
@ -317,7 +310,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
break;
}
case 2: {
case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = malloc(sizeof(SUdfTeardownRequest));
teardown->udfHandle = *(int64_t *) buf;
@ -328,7 +321,7 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
}
if (buf - bufMsg != bufLen) {
debugPrint(stderr, "%s\n", "decode request error");
debugPrint("%s", "decode request error");
free(request->subReq);
free(request);
return -1;
@ -338,22 +331,22 @@ int32_t decodeRequest(char *bufMsg, int32_t bufLen, SUdfRequest **pRequest) {
}
int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
debugPrint(stdout, "%s\n", "encoding response");
debugPrint("%s", "encoding response");
int32_t len = sizeof(SUdfResponse) - sizeof(void *);
switch (response->type) {
case 0: {
case UDF_TASK_SETUP: {
len += sizeof(SUdfSetupResponse);
break;
}
case 1: {
case UDF_TASK_CALL: {
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
len += sizeof(SUdfCallResponse) - 2 * sizeof(char *) +
callResp->outputBytes + callResp->newStateBytes;
break;
}
case 2: {
case UDF_TASK_TEARDOWN: {
len += sizeof(SUdfTeardownResponse);
break;
}
@ -374,13 +367,13 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
switch (response->type) {
case 0: {
case UDF_TASK_SETUP: {
SUdfSetupResponse *setupResp = (SUdfSetupResponse *) (response->subRsp);
*(int64_t *) buf = setupResp->udfHandle;
buf += sizeof(int64_t);
break;
}
case 1: {
case UDF_TASK_CALL: {
SUdfCallResponse *callResp = (SUdfCallResponse *) (response->subRsp);
*(int32_t *) buf = callResp->outputBytes;
buf += sizeof(int32_t);
@ -393,7 +386,7 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
buf += callResp->newStateBytes;
break;
}
case 2: {
case UDF_TASK_TEARDOWN: {
SUdfTeardownResponse *teardownResp = (SUdfTeardownResponse *) (response->subRsp);
break;
}
@ -408,10 +401,10 @@ int32_t encodeResponse(char **pBuf, int32_t *pBufLen, SUdfResponse *response) {
}
int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
debugPrint(stdout, "%s\n", "decoding response");
debugPrint("%s", "decoding response");
if (*(int32_t *) bufMsg != bufLen) {
debugPrint(stderr, "%s\n", "can not decode response");
debugPrint("%s", "can not decode response");
return -1;
}
char *buf = bufMsg;
@ -426,14 +419,14 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
buf += sizeof(int32_t);
switch (rsp->type) {
case 0: {
case UDF_TASK_SETUP: {
SUdfSetupResponse *setupRsp = (SUdfSetupResponse *) malloc(sizeof(SUdfSetupResponse));
setupRsp->udfHandle = *(int64_t *) buf;
buf += sizeof(int64_t);
rsp->subRsp = (char *) setupRsp;
break;
}
case 1: {
case UDF_TASK_CALL: {
SUdfCallResponse *callRsp = (SUdfCallResponse *) malloc(sizeof(SUdfCallResponse));
callRsp->outputBytes = *(int32_t *) buf;
buf += sizeof(int32_t);
@ -450,7 +443,7 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
rsp->subRsp = callRsp;
break;
}
case 2: {
case UDF_TASK_TEARDOWN: {
SUdfTeardownResponse *teardownRsp = (SUdfTeardownResponse *) malloc(sizeof(SUdfTeardownResponse));
rsp->subRsp = teardownRsp;
break;
@ -459,7 +452,7 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
break;
}
if (buf - bufMsg != bufLen) {
debugPrint(stderr, "%s\n", "can not decode response");
debugPrint("%s", "can not decode response");
free(rsp->subRsp);
free(rsp);
return -1;
@ -469,8 +462,9 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse **pResponse) {
}
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
debugPrint(stderr, "Process exited with status %" PRId64 ", signal %d\n", exit_status, term_signal);
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
uv_close((uv_handle_t *) req, NULL);
//TODO: restart the udfd process
}
void onUdfcPipeClose(uv_handle_t *handle) {
@ -488,7 +482,7 @@ void onUdfcPipeClose(uv_handle_t *handle) {
}
int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
debugPrint(stdout, "%s\n", "get uv task result");
debugPrint("%s", "get uv task result");
if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) {
SUdfResponse *rsp;
@ -497,13 +491,13 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
switch (task->type) {
case UDF_TASK_SETUP: {
//TODO: copy
//TODO: copy or not
task->_setup.rsp = *(SUdfSetupResponse *) (rsp->subRsp);
break;
}
case UDF_TASK_CALL: {
task->_call.rsp = *(SUdfCallResponse *) (rsp->subRsp);
//TODO: copy
//TODO: copy or not
break;
}
case UDF_TASK_TEARDOWN: {
@ -532,7 +526,7 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
}
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint(stdout, "%s\n", "client allocate buffer to receive from pipe");
debugPrint("%s", "client allocate buffer to receive from pipe");
SClientUvConn *conn = handle->data;
SClientConnBuf *connBuf = &conn->readBuf;
@ -565,7 +559,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
}
}
debugPrint(stdout, "\tconn buf cap - len - total : %d - %d - %d\n", connBuf->cap, connBuf->len, connBuf->total);
debugPrint("\tconn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
}
@ -621,7 +615,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
}
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint(stdout, "%s, nread: %zd\n", "client read from pipe", nread);
debugPrint("%s, nread: %zd", "client read from pipe", nread);
if (nread == 0) return;
SClientUvConn *conn = client->data;
@ -634,7 +628,7 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
if (nread < 0) {
debugPrint(stderr, "\tclient read error: %s\n", uv_strerror(nread));
debugPrint("\tclient read error: %s", uv_strerror(nread));
if (nread == UV_EOF) {
//TODO:
}
@ -644,7 +638,7 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
void onUdfClientWrite(uv_write_t *write, int status) {
debugPrint(stderr, "%s\n", "after writing to pipe");
debugPrint("%s", "after writing to pipe");
SClientUvTaskNode *uvTask = write->data;
if (status == 0) {
uv_pipe_t *pipe = uvTask->pipe;
@ -653,7 +647,7 @@ void onUdfClientWrite(uv_write_t *write, int status) {
} else {
//TODO Log error;
}
debugPrint(stdout, "\tlength:%zu\n", uvTask->reqBuf.len);
debugPrint("\tlength:%zu", uvTask->reqBuf.len);
free(write);
free(uvTask->reqBuf.base);
}
@ -707,7 +701,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
}
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint(stdout, "%s, %d\n", "queue uv task", uvTask->type);
debugPrint("%s, %d", "queue uv task", uvTask->type);
uv_mutex_lock(&gUdfTaskQueueMutex);
udfTaskQueueInsertTail(gUdfTaskQueue, uvTask);
@ -721,7 +715,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
}
int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint(stdout, "%s, type %d\n", "start uv task ", uvTask->type);
debugPrint("%s, type %d", "start uv task ", uvTask->type);
switch (uvTask->type) {
case UV_TASK_CONNECT: {
uv_pipe_t *pipe = malloc(sizeof(uv_pipe_t));
@ -790,20 +784,21 @@ void udfStopAsyncCb(uv_async_t *async) {
void startUdfd(void *argsThread) {
uv_loop_init(&gUdfdLoop);
// uv_process_options_t options;
// static char path[256] = {0};
// size_t cwdSize;
// uv_cwd(path, &cwdSize);
// strcat(path, "/udfd");
// char* args[2] = {path, NULL};
// options.args = args;
// options.file = path;
// options.exit_cb = onUdfdExit;
//
// int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
// if (err != 0) {
// debugPrint(stderr, "can not spawn udfd. error: %s", uv_strerror(err));
// }
//TODO: path
uv_process_options_t options;
static char path[256] = {0};
size_t cwdSize;
uv_cwd(path, &cwdSize);
strcat(path, "./udfd");
char* args[2] = {path, NULL};
options.args = args;
options.file = path;
options.exit_cb = onUdfdExit;
int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
if (err != 0) {
debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
@ -822,7 +817,7 @@ int32_t startUdfService() {
int32_t stopUdfService() {
uv_barrier_destroy(&gUdfInitBarrier);
//uv_process_kill(&gUdfdProcess, SIGINT);
uv_process_kill(&gUdfdProcess, SIGINT);
uv_async_send(&gUdfLoopStopAsync);
uv_mutex_destroy(&gUdfTaskQueueMutex);
uv_thread_join(&gUdfLoopThread);
@ -844,7 +839,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
}
int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
debugPrint(stdout, "%s\n", "client setup udf");
debugPrint("%s", "client setup udf");
SClientUdfTask *task = malloc(sizeof(SClientUdfTask));
task->errCode = 0;
task->session = malloc(sizeof(SUdfUvSession));
@ -875,7 +870,7 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) {
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, char **newState,
int32_t *newStateSize, SUdfDataBlock *output) {
debugPrint(stdout, "%s\n", "client call udf");
debugPrint("%s", "client call udf");
SClientUdfTask *task = malloc(sizeof(SClientUdfTask));
task->errCode = 0;
@ -904,7 +899,7 @@ int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, S
}
int32_t teardownUdf(UdfHandle handle) {
debugPrint(stdout, "%s\n", "client teardown udf");
debugPrint("%s", "client teardown udf");
SClientUdfTask *task = malloc(sizeof(SClientUdfTask));
task->errCode = 0;

View File

@ -15,16 +15,11 @@
#include "uv.h"
#include "os.h"
#include "tlog.h"
#include "tudf.h"
#include "tudfInt.h"
//TODO replaces them with qDebug
#define DEBUG
#ifdef DEBUG
#define debugPrint(...) fprintf(__VA_ARGS__)
#else
#define debugPrint(...) /**/
#endif
static uv_loop_t *loop;
@ -66,14 +61,14 @@ void udfdProcessRequest(uv_work_t *req) {
switch (request->type) {
case UDF_TASK_SETUP: {
debugPrint(stdout, "%s\n", "process setup request");
debugPrint("%s", "process setup request");
SUdf *udf = malloc(sizeof(SUdf));
udf->refCount = 0;
SUdfSetupRequest *setup = request->subReq;
strcpy(udf->name, setup->udfName);
int err = uv_dlopen(setup->path, &udf->lib);
if (err != 0) {
debugPrint(stderr, "can not load library %s. error: %s", setup->path, uv_strerror(err));
debugPrint("can not load library %s. error: %s", setup->path, uv_strerror(err));
//TODO set error
}
@ -109,7 +104,7 @@ void udfdProcessRequest(uv_work_t *req) {
}
case UDF_TASK_CALL: {
debugPrint(stdout, "%s\n", "process call request");
debugPrint("%s", "process call request");
SUdfCallRequest *call = request->subReq;
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle);
SUdf *udf = handle->udf;
@ -146,7 +141,7 @@ void udfdProcessRequest(uv_work_t *req) {
break;
}
case UDF_TASK_TEARDOWN: {
debugPrint(stdout, "%s\n", "process teardown request");
debugPrint("%s", "process teardown request");
SUdfTeardownRequest *teardown = request->subReq;
SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle);
@ -186,12 +181,12 @@ void udfdProcessRequest(uv_work_t *req) {
}
void udfdOnWrite(uv_write_t *req, int status) {
debugPrint(stdout, "%s\n", "after writing to pipe");
debugPrint("%s", "after writing to pipe");
if (status < 0) {
debugPrint(stderr, "Write error %s\n", uv_err_name(status));
debugPrint("Write error %s", uv_err_name(status));
}
SUvUdfWork *work = (SUvUdfWork *) req->data;
debugPrint(stdout, "\tlength: %zu\n", work->output.len);
debugPrint("\tlength: %zu", work->output.len);
free(work->output.base);
free(work);
free(req);
@ -199,7 +194,7 @@ void udfdOnWrite(uv_write_t *req, int status) {
void udfdSendResponse(uv_work_t *work, int status) {
debugPrint(stdout, "%s\n", "send response");
debugPrint("%s", "send response");
SUvUdfWork *udfWork = (SUvUdfWork *) (work->data);
uv_write_t *write_req = malloc(sizeof(uv_write_t));
@ -210,7 +205,7 @@ void udfdSendResponse(uv_work_t *work, int status) {
}
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint(stdout, "%s\n", "allocate buffer for read");
debugPrint("%s", "allocate buffer for read");
SUdfdUvConn *ctx = handle->data;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
if (ctx->inputCap == 0) {
@ -240,7 +235,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->len = 0;
}
}
debugPrint(stdout, "\tinput buf cap - len - total : %d - %d - %d\n", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
debugPrint("\tinput buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
}
@ -279,7 +274,7 @@ void udfdUvHandleError(SUdfdUvConn *conn) {
}
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint(stdout, "%s, nread: %zd\n", "read from pipe", nread);
debugPrint("%s, nread: %zd", "read from pipe", nread);
if (nread == 0) return;
@ -296,7 +291,7 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
if (nread < 0) {
debugPrint(stderr, "Read error %s\n", uv_err_name(nread));
debugPrint("Read error %s", uv_err_name(nread));
if (nread == UV_EOF) {
//TODO check more when close
} else {
@ -306,7 +301,7 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
void udfdOnNewConnection(uv_stream_t *server, int status) {
debugPrint(stdout, "%s\n", "on new connection");
debugPrint("%s", "on new connection");
if (status < 0) {
// TODO
return;
@ -335,7 +330,7 @@ void removeListeningPipe(int sig) {
}
int main() {
debugPrint(stderr, "libuv version: %x\n", UV_VERSION_HEX);
debugPrint("libuv version: %x", UV_VERSION_HEX);
loop = uv_default_loop();
uv_fs_t req;
@ -348,12 +343,12 @@ int main() {
int r;
if ((r = uv_pipe_bind(&server, "udf.sock"))) {
debugPrint(stderr, "Bind error %s\n", uv_err_name(r));
debugPrint("Bind error %s\n", uv_err_name(r));
removeListeningPipe(0);
return 1;
}
if ((r = uv_listen((uv_stream_t *) &server, 128, udfdOnNewConnection))) {
debugPrint(stderr, "Listen error %s\n", uv_err_name(r));
debugPrint("Listen error %s", uv_err_name(r));
return 2;
}
uv_run(loop, UV_RUN_DEFAULT);