api change : remove udfc handle and epset

This commit is contained in:
slzhou 2022-04-24 17:36:07 +08:00
parent a9d47ded0a
commit a0852402a9
5 changed files with 209 additions and 132 deletions

View File

@ -37,23 +37,24 @@ extern "C" {
enum { enum {
UDFC_CODE_STOPPING = -1, UDFC_CODE_STOPPING = -1,
UDFC_CODE_PIPE_READ_ERR = -2, UDFC_CODE_PIPE_READ_ERR = -2,
UDF_CODE_LOAD_UDF_FAILURE = -3, UDFC_CODE_CONNECT_PIPE_ERR = -3,
UDFC_CODE_LOAD_UDF_FAILURE = -4,
UDFC_CODE_INVALID_STATE = -5
}; };
typedef void *UdfcHandle;
typedef void *UdfcFuncHandle; typedef void *UdfcFuncHandle;
/** /**
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code * @return error code
*/ */
int32_t udfcOpen(UdfcHandle* proxyHandle); int32_t udfcOpen();
/** /**
* destroy udfd proxy * destroy udfd proxy
* @return error code * @return error code
*/ */
int32_t udfcClose(UdfcHandle proxyhandle); int32_t udfcClose();
/** /**
@ -62,7 +63,7 @@ int32_t udfcClose(UdfcHandle proxyhandle);
* @param handle, out * @param handle, out
* @return error code * @return error code
*/ */
int32_t setupUdf(UdfcHandle proxyHandle, char udfName[], SEpSet *epSet, UdfcFuncHandle *handle); int32_t setupUdf(char udfName[], UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta { typedef struct SUdfColumnMeta {
int16_t type; int16_t type;

View File

@ -39,7 +39,6 @@ enum {
typedef struct SUdfSetupRequest { typedef struct SUdfSetupRequest {
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
SEpSet epSet;
} SUdfSetupRequest; } SUdfSetupRequest;
typedef struct SUdfSetupResponse { typedef struct SUdfSetupResponse {

View File

@ -137,11 +137,11 @@ typedef struct SUdfdProxy {
int8_t gUdfcState; int8_t gUdfcState;
QUEUE gUdfTaskQueue; QUEUE gUdfTaskQueue;
QUEUE gUvProcTaskQueue; QUEUE gUvProcTaskQueue;
// int8_t gUdfcState = UDFC_STATE_INITAL;
// QUEUE gUdfTaskQueue = {0}; int8_t initialized;
// QUEUE gUvProcTaskQueue = {0};
} SUdfdProxy; } SUdfdProxy;
SUdfdProxy gUdfdProxy = {0};
typedef struct SUdfUvSession { typedef struct SUdfUvSession {
SUdfdProxy *udfc; SUdfdProxy *udfc;
@ -209,7 +209,6 @@ enum {
UDFC_STATE_STARTNG, // starting after udfcOpen UDFC_STATE_STARTNG, // starting after udfcOpen
UDFC_STATE_READY, // started and begin to receive quests UDFC_STATE_READY, // started and begin to receive quests
UDFC_STATE_STOPPING, // stopping after udfcClose UDFC_STATE_STOPPING, // stopping after udfcClose
UDFC_STATUS_FINAL, // stopped
}; };
int32_t getUdfdPipeName(char* pipeName, int32_t size) { int32_t getUdfdPipeName(char* pipeName, int32_t size) {
@ -226,13 +225,11 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) {
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
int32_t len = 0; int32_t len = 0;
len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN); len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN);
len += taosEncodeSEpSet(buf, &setup->epSet);
return len; return len;
} }
void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) { void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN); buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN);
buf = taosDecodeSEpSet((void*)buf, &request->epSet);
return (void*)buf; return (void*)buf;
} }
@ -615,7 +612,7 @@ void onUdfcPipeClose(uv_handle_t *handle) {
} }
int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) { int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
debugPrint("%s", "get uv task result"); fnDebug("udfc get uv task result. task: %p", task);
if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) { if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp; SUdfResponse rsp;
@ -658,7 +655,6 @@ int32_t udfcGetUvTaskResponseResult(SClientUdfTask *task, SClientUvTaskNode *uvT
} }
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
debugPrint("%s", "client allocate buffer to receive from pipe");
SClientUvConn *conn = handle->data; SClientUvConn *conn = handle->data;
SClientConnBuf *connBuf = &conn->readBuf; SClientConnBuf *connBuf = &conn->readBuf;
@ -673,7 +669,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
buf->base = connBuf->buf; buf->base = connBuf->buf;
buf->len = connBuf->cap; buf->len = connBuf->cap;
} else { } else {
//TODO: log error fnError("udfc allocate buffer failure. size: %d", msgHeadSize);
buf->base = NULL; buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
@ -685,13 +681,13 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
buf->base = connBuf->buf + connBuf->len; buf->base = connBuf->buf + connBuf->len;
buf->len = connBuf->cap - connBuf->len; buf->len = connBuf->cap - connBuf->len;
} else { } else {
//TODO: log error free connBuf->buf fnError("udfc re-allocate buffer failure. size: %d", connBuf->cap);
buf->base = NULL; buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
} }
debugPrint("\tconn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total); fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
} }
@ -700,6 +696,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
connBuf->total = *(int32_t *) (connBuf->buf); connBuf->total = *(int32_t *) (connBuf->buf);
} }
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
fnTrace("udfc complete message is received, now handle it");
return true; return true;
} }
return false; return false;
@ -707,10 +704,10 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
void udfcUvHandleRsp(SClientUvConn *conn) { void udfcUvHandleRsp(SClientUvConn *conn) {
SClientConnBuf *connBuf = &conn->readBuf; SClientConnBuf *connBuf = &conn->readBuf;
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen int32_t then seqnum int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
if (QUEUE_EMPTY(&conn->taskQueue)) { if (QUEUE_EMPTY(&conn->taskQueue)) {
//LOG error fnError("udfc no task waiting for response on connection");
return; return;
} }
bool found = false; bool found = false;
@ -724,7 +721,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
found = true; found = true;
taskFound = task; taskFound = task;
} else { } else {
//LOG error; fnError("udfc more than one task waiting for the same response");
continue; continue;
} }
} }
@ -738,7 +735,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
uv_sem_post(&taskFound->taskSem); uv_sem_post(&taskFound->taskSem);
QUEUE_REMOVE(&taskFound->procTaskQueue); QUEUE_REMOVE(&taskFound->procTaskQueue);
} else { } else {
//TODO: LOG error fnError("no task is waiting for the response.");
} }
connBuf->buf = NULL; connBuf->buf = NULL;
connBuf->total = -1; connBuf->total = -1;
@ -762,7 +759,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
} }
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
debugPrint("%s, nread: %zd", "client read from pipe", nread); fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
if (nread == 0) return; if (nread == 0) return;
SClientUvConn *conn = client->data; SClientUvConn *conn = client->data;
@ -775,9 +772,9 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
} }
if (nread < 0) { if (nread < 0) {
debugPrint("\tclient read error: %s", uv_strerror(nread)); fnError("udfc client pipe %p read error: %s", client, uv_strerror(nread));
if (nread == UV_EOF) { if (nread == UV_EOF) {
//TODO: fnError("udfc client pipe %p closed", client);
} }
udfcUvHandleError(conn); udfcUvHandleError(conn);
} }
@ -785,16 +782,15 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
} }
void onUdfClientWrite(uv_write_t *write, int status) { void onUdfClientWrite(uv_write_t *write, int status) {
debugPrint("%s", "after writing to pipe");
SClientUvTaskNode *uvTask = write->data; SClientUvTaskNode *uvTask = write->data;
uv_pipe_t *pipe = uvTask->pipe;
if (status == 0) { if (status == 0) {
uv_pipe_t *pipe = uvTask->pipe;
SClientUvConn *conn = pipe->data; SClientUvConn *conn = pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
} else { } else {
//TODO Log error; fnError("udfc client %p write error.", pipe);
} }
debugPrint("\tlength:%zu", uvTask->reqBuf.len); fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
taosMemoryFree(write); taosMemoryFree(write);
taosMemoryFree(uvTask->reqBuf.base); taosMemoryFree(uvTask->reqBuf.base);
} }
@ -852,7 +848,7 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
} }
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, %d", "queue uv task", uvTask->type); fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
SUdfdProxy *udfc = uvTask->udfc; SUdfdProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue); QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue);
@ -866,7 +862,7 @@ int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
} }
int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, type %d", "start uv task ", uvTask->type); fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
@ -981,27 +977,37 @@ void constructUdfService(void *argsThread) {
uv_loop_close(&udfc->gUdfdLoop); uv_loop_close(&udfc->gUdfdLoop);
} }
int32_t udfcOpen(UdfcHandle *udfc) { int32_t udfcOpen() {
SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy)); int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1);
if (old == 1) {
return 0;
}
SUdfdProxy *proxy = &gUdfdProxy;
getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN); getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN);
proxy->gUdfcState = UDFC_STATE_STARTNG; proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->gUdfInitBarrier, 2); uv_barrier_init(&proxy->gUdfInitBarrier, 2);
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
uv_barrier_wait(&proxy->gUdfInitBarrier); atomic_store_8(&proxy->gUdfcState, UDFC_STATE_READY);
proxy->gUdfcState = UDFC_STATE_READY; proxy->gUdfcState = UDFC_STATE_READY;
*udfc = proxy; uv_barrier_wait(&proxy->gUdfInitBarrier);
fnInfo("udfc initialized")
return 0; return 0;
} }
int32_t udfcClose(UdfcHandle udfcHandle) { int32_t udfcClose() {
SUdfdProxy *udfc = udfcHandle; int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0);
if (old == 0) {
return 0;
}
SUdfdProxy *udfc = &gUdfdProxy;
udfc->gUdfcState = UDFC_STATE_STOPPING; udfc->gUdfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->gUdfLoopStopAsync); uv_async_send(&udfc->gUdfLoopStopAsync);
uv_thread_join(&udfc->gUdfLoopThread); uv_thread_join(&udfc->gUdfLoopThread);
uv_mutex_destroy(&udfc->gUdfTaskQueueMutex); uv_mutex_destroy(&udfc->gUdfTaskQueueMutex);
uv_barrier_destroy(&udfc->gUdfInitBarrier); uv_barrier_destroy(&udfc->gUdfInitBarrier);
udfc->gUdfcState = UDFC_STATUS_FINAL; udfc->gUdfcState = UDFC_STATE_INITAL;
taosMemoryFree(udfc); fnInfo("udfc cleaned up");
return 0; return 0;
} }
@ -1019,29 +1025,36 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return task->errCode; return task->errCode;
} }
int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle *funcHandle) { int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
debugPrint("%s", "client setup udf"); fnInfo("udfc setup udf. udfName: %s", udfName);
if (gUdfdProxy.gUdfcState != UDFC_STATE_READY) {
return UDFC_CODE_INVALID_STATE;
}
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = taosMemoryMalloc(sizeof(SUdfUvSession)); task->session = taosMemoryMalloc(sizeof(SUdfUvSession));
task->session->udfc = udfc; task->session->udfc = &gUdfdProxy;
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
req->epSet = *epSet;
int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT); int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) { if (errCode != 0) {
//TODO: log error fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName);
return -1; return UDFC_CODE_CONNECT_PIPE_ERR;
} }
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfSetupResponse *rsp = &task->_setup.rsp; SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle; task->session->severHandle = rsp->udfHandle;
*funcHandle = task->session; if (task->errCode != 0) {
fnError("failed to setup udf. err: %d", task->errCode)
} else {
fnInfo("sucessfully setup udf func handle. handle: %p", task->session);
*funcHandle = task->session;
}
int32_t err = task->errCode; int32_t err = task->errCode;
taosMemoryFree(task); taosMemoryFree(task);
return err; return err;
@ -1049,7 +1062,7 @@ int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) { SSDataBlock* output, SUdfInterBuf *newState) {
debugPrint("%s", "client call udf"); fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
@ -1087,35 +1100,37 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfCallResponse *rsp = &task->_call.rsp; if (task->errCode != 0) {
switch (callType) { fnError("call udf failure. err: %d", task->errCode);
case TSDB_UDF_CALL_AGG_INIT: { } else {
*newState = rsp->resultBuf; SUdfCallResponse *rsp = &task->_call.rsp;
break; switch (callType) {
} case TSDB_UDF_CALL_AGG_INIT: {
case TSDB_UDF_CALL_AGG_PROC: { *newState = rsp->resultBuf;
*newState = rsp->resultBuf; break;
break; }
} case TSDB_UDF_CALL_AGG_PROC: {
case TSDB_UDF_CALL_AGG_MERGE: { *newState = rsp->resultBuf;
*newState = rsp->resultBuf; break;
break; }
} case TSDB_UDF_CALL_AGG_MERGE: {
case TSDB_UDF_CALL_AGG_FIN: { *newState = rsp->resultBuf;
*newState = rsp->resultBuf; break;
break; }
} case TSDB_UDF_CALL_AGG_FIN: {
case TSDB_UDF_CALL_SCALA_PROC: { *newState = rsp->resultBuf;
*output = rsp->resultData; break;
break; }
case TSDB_UDF_CALL_SCALA_PROC: {
*output = rsp->resultData;
break;
}
} }
} }
taosMemoryFree(task); taosMemoryFree(task);
return task->errCode; return task->errCode;
} }
//TODO: translate these calls to callUdf
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_INIT; int8_t callType = TSDB_UDF_CALL_AGG_INIT;
@ -1159,7 +1174,7 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
} }
int32_t teardownUdf(UdfcFuncHandle handle) { int32_t teardownUdf(UdfcFuncHandle handle) {
debugPrint("%s", "client teardown udf"); fnInfo("tear down udf. udf func handle: %p", handle);
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
@ -1171,7 +1186,6 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
udfcRunUvTask(task, UV_TASK_REQ_RSP); udfcRunUvTask(task, UV_TASK_REQ_RSP);
SUdfTeardownResponse *rsp = &task->_teardown.rsp; SUdfTeardownResponse *rsp = &task->_teardown.rsp;
int32_t err = task->errCode; int32_t err = task->errCode;

View File

@ -20,6 +20,7 @@
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tdatablock.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
@ -31,8 +32,9 @@ typedef struct SUdfdContext {
uv_signal_t intrSignal; uv_signal_t intrSignal;
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
uv_pipe_t listeningPipe; uv_pipe_t listeningPipe;
void *clientRpc;
void *clientRpc;
SCorEpSet mgmtEp;
uv_mutex_t udfsMutex; uv_mutex_t udfsMutex;
SHashObj *udfsHash; SHashObj *udfsHash;
@ -83,17 +85,17 @@ typedef struct SUdfcFuncHandle {
SUdf *udf; SUdf *udf;
} SUdfcFuncHandle; } SUdfcFuncHandle;
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf); int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) { int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strcpy(udf->name, udfName); strcpy(udf->name, udfName);
udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf); udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
//strcpy(udf->path, "/home/slzhou/TDengine/debug/build/lib/libudf1.so"); //strcpy(udf->path, "/home/slzhou/TDengine/debug/build/lib/libudf1.so");
int err = uv_dlopen(udf->path, &udf->lib); int err = uv_dlopen(udf->path, &udf->lib);
if (err != 0) { if (err != 0) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
return UDF_CODE_LOAD_UDF_FAILURE; return UDFC_CODE_LOAD_UDF_FAILURE;
} }
// TODO: find all the functions // TODO: find all the functions
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
@ -140,7 +142,7 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_lock(&udf->lock); uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) { if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING; udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, &setup->epSet, udf); udfdLoadUdf(setup->udfName, udf);
udf->state = UDF_STATE_READY; udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady); uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
@ -398,7 +400,48 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf) { int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
pEpSet->version = 0;
// init mnode ip set
SEpSet* mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
mgmtEpSet->numOfEps++;
}
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
return 0;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1; retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
@ -415,7 +458,7 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName,
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
rpcSendRecv(clientRpc, pEpSet, &rpcMsg, &rpcRsp); rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp);
SRetrieveFuncRsp retrieveRsp = {0}; SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp); tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
@ -618,5 +661,6 @@ int main(int argc, char *argv[]) {
return -1; return -1;
} }
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
return udfdRun(); return udfdRun();
} }

View File

@ -1,65 +1,84 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "uv.h" #include "uv.h"
#include "fnLog.h"
#include "os.h" #include "os.h"
#include "tudf.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h"
#include "tudf.h"
static int32_t parseArgs(int32_t argc, char *argv[]) {
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) {
printf("config file path overflow");
return -1;
}
tstrncpy(configDir, argv[i], PATH_MAX);
} else {
printf("'-c' requires a parameter, default is %s\n", configDir);
return -1;
}
}
}
return 0;
}
static int32_t initLog() {
char logName[12] = {0};
snprintf(logName, sizeof(logName), "%slog", "udfc");
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0);
}
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
UdfcHandle udfc; parseArgs(argc, argv);
udfcOpen(&udfc); initLog();
uv_sleep(1000); if (taosInitCfg(configDir, NULL, NULL, NULL, 0) != 0) {
char path[256] = {0}; fnError("failed to start since read config error");
size_t cwdSize = 256; return -1;
int err = uv_cwd(path, &cwdSize); }
if (err != 0) {
fprintf(stderr, "err cwd: %s\n", uv_strerror(err)); udfcOpen();
return err; uv_sleep(1000);
UdfcFuncHandle handle;
setupUdf("udf1", &handle);
SSDataBlock block = {0};
SSDataBlock *pBlock = &block;
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1;
pBlock->info.rows = 4;
char data[16] = {0};
char bitmap[4] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
} }
fprintf(stdout, "current working directory:%s\n", path); taosArrayPush(pBlock->pDataBlock, &colInfo);
strcat(path, "/libudf1.so"); }
UdfcFuncHandle handle; SScalarParam input = {0};
SEpSet epSet; input.numOfRows = pBlock->info.rows;
epSet.inUse = 0; input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
taosGetFqdnPortFromEp("localhost:7100", &epSet.eps[0]); SScalarParam output = {0};
taosGetFqdnPortFromEp("localhost:7200", &epSet.eps[1]); callUdfScalarFunc(handle, &input, 1, &output);
epSet.numOfEps = 2;
setupUdf(udfc, "udf1", &epSet, &handle);
SSDataBlock block = {0}; SColumnInfoData *col = output.columnData;
SSDataBlock* pBlock = &block; for (int32_t i = 0; i < output.numOfRows; ++i) {
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
pBlock->info.numOfCols = 1; }
pBlock->info.rows = 4; teardownUdf(handle);
char data[16] = {0}; udfcClose();
char bitmap[4] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
}
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
SScalarParam input = {0};
input.numOfRows = pBlock->info.rows;
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
callUdfScalarFunc(handle, &input, 1 , &output);
SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) {
fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t)));
}
teardownUdf(handle);
udfcClose(udfc);
} }