Merge pull request #11687 from taosdata/3.0_udfd
feat(udf):dnode start/stop/restart udfd and vnode create/destory proxy to udfd
This commit is contained in:
commit
babef48c8a
|
@ -59,6 +59,7 @@ extern int32_t sDebugFlag;
|
||||||
extern int32_t tsdbDebugFlag;
|
extern int32_t tsdbDebugFlag;
|
||||||
extern int32_t tqDebugFlag;
|
extern int32_t tqDebugFlag;
|
||||||
extern int32_t fsDebugFlag;
|
extern int32_t fsDebugFlag;
|
||||||
|
extern int32_t fnDebugFlag;
|
||||||
|
|
||||||
int32_t taosInitLog(const char *logName, int32_t maxFiles);
|
int32_t taosInitLog(const char *logName, int32_t maxFiles);
|
||||||
void taosCloseLog();
|
void taosCloseLog();
|
||||||
|
|
|
@ -289,6 +289,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "fnDebugFlag", fnDebugFlag, 0, 255, 0) != 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,6 +474,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
|
||||||
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
|
tsdbDebugFlag = cfgGetItem(pCfg, "tsdbDebugFlag")->i32;
|
||||||
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
|
tqDebugFlag = cfgGetItem(pCfg, "tqDebugFlag")->i32;
|
||||||
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
|
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
|
||||||
|
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosSetClientCfg(SConfig *pCfg) {
|
static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
//
|
||||||
|
// Created by slzhou on 22-4-20.
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef TDENGINE_FNLOG_H
|
||||||
|
#define TDENGINE_FNLOG_H
|
||||||
|
#include "tlog.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define fnFatal(...) { if (fnDebugFlag & DEBUG_FATAL) { taosPrintLog("FN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||||
|
#define fnError(...) { if (fnDebugFlag & DEBUG_ERROR) { taosPrintLog("FN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||||
|
#define fnWarn(...) { if (fnDebugFlag & DEBUG_WARN) { taosPrintLog("FN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||||
|
#define fnInfo(...) { if (fnDebugFlag & DEBUG_INFO) { taosPrintLog("FN ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||||
|
#define fnDebug(...) { if (fnDebugFlag & DEBUG_DEBUG) { taosPrintLog("FN ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||||
|
#define fnTrace(...) { if (fnDebugFlag & DEBUG_TRACE) { taosPrintLog("FN ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_FNLOG_H
|
|
@ -26,6 +26,9 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define UDF_LISTEN_PIPE_NAME_LEN 32
|
||||||
|
#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock."
|
||||||
|
|
||||||
//======================================================================================
|
//======================================================================================
|
||||||
//begin API to taosd and qworker
|
//begin API to taosd and qworker
|
||||||
|
|
||||||
|
@ -35,17 +38,28 @@ enum {
|
||||||
UDFC_CODE_PIPE_READ_ERR = -3,
|
UDFC_CODE_PIPE_READ_ERR = -3,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*TODO: no api for dnode startudfd/stopudfd*/
|
||||||
/**
|
/**
|
||||||
* start udf dameon service
|
* start udfd dameon service
|
||||||
* @return error code
|
|
||||||
*/
|
*/
|
||||||
int32_t startUdfService();
|
int32_t startUdfd(int32_t dnodeId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* stop udf dameon service
|
* stop udfd dameon service
|
||||||
|
*/
|
||||||
|
int32_t stopUdfd(int32_t dnodeId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
|
||||||
* @return error code
|
* @return error code
|
||||||
*/
|
*/
|
||||||
int32_t stopUdfService();
|
int32_t createUdfdProxy(int32_t dnodeId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* destroy udfd proxy
|
||||||
|
* @return error code
|
||||||
|
*/
|
||||||
|
int32_t destroyUdfdProxy(int32_t dnodeId);
|
||||||
|
|
||||||
typedef void *UdfHandle;
|
typedef void *UdfHandle;
|
||||||
|
|
||||||
|
@ -101,7 +115,6 @@ typedef struct SUdfInterBuf {
|
||||||
char* buf;
|
char* buf;
|
||||||
} SUdfInterBuf;
|
} SUdfInterBuf;
|
||||||
|
|
||||||
//TODO: translate these calls to callUdf
|
|
||||||
// output: interBuf
|
// output: interBuf
|
||||||
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf);
|
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf);
|
||||||
// input: block, state
|
// input: block, state
|
||||||
|
|
|
@ -32,9 +32,9 @@ typedef struct SUdfInfo {
|
||||||
|
|
||||||
typedef void *UdfHandle;
|
typedef void *UdfHandle;
|
||||||
|
|
||||||
int32_t startUdfService();
|
int32_t createUdfdProxy();
|
||||||
|
|
||||||
int32_t stopUdfService();
|
int32_t destroyUdfdProxy();
|
||||||
|
|
||||||
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
|
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles);
|
||||||
|
|
||||||
|
|
|
@ -200,10 +200,10 @@ int64_t gUdfTaskSeqNum = 0;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
UDFC_STATE_INITAL = 0, // initial state
|
UDFC_STATE_INITAL = 0, // initial state
|
||||||
UDFC_STATE_STARTNG, // starting after startUdfService
|
UDFC_STATE_STARTNG, // starting after createUdfdProxy
|
||||||
UDFC_STATE_READY, // started and begin to receive quests
|
UDFC_STATE_READY, // started and begin to receive quests
|
||||||
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
|
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart.
|
||||||
UDFC_STATE_STOPPING, // stopping after stopUdfService
|
UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy
|
||||||
UDFC_STATUS_FINAL, // stopped
|
UDFC_STATUS_FINAL, // stopped
|
||||||
};
|
};
|
||||||
int8_t gUdfcState = UDFC_STATE_INITAL;
|
int8_t gUdfcState = UDFC_STATE_INITAL;
|
||||||
|
@ -929,7 +929,7 @@ void udfStopAsyncCb(uv_async_t *async) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startUdfd();
|
int32_t udfcSpawnUdfd();
|
||||||
|
|
||||||
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
|
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
|
||||||
//TODO: pipe close will be first received
|
//TODO: pipe close will be first received
|
||||||
|
@ -944,12 +944,12 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
|
||||||
if (gUdfcState == UDFC_STATE_READY) {
|
if (gUdfcState == UDFC_STATE_READY) {
|
||||||
gUdfcState = UDFC_STATE_RESTARTING;
|
gUdfcState = UDFC_STATE_RESTARTING;
|
||||||
//TODO: asynchronous without blocking. how to do it
|
//TODO: asynchronous without blocking. how to do it
|
||||||
cleanUpUvTasks();
|
//cleanUpUvTasks();
|
||||||
startUdfd();
|
udfcSpawnUdfd();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startUdfd() {
|
int32_t udfcSpawnUdfd() {
|
||||||
//TODO: path
|
//TODO: path
|
||||||
uv_process_options_t options = {0};
|
uv_process_options_t options = {0};
|
||||||
static char path[256] = {0};
|
static char path[256] = {0};
|
||||||
|
@ -979,9 +979,6 @@ int32_t startUdfd() {
|
||||||
void constructUdfService(void *argsThread) {
|
void constructUdfService(void *argsThread) {
|
||||||
uv_loop_init(&gUdfdLoop);
|
uv_loop_init(&gUdfdLoop);
|
||||||
|
|
||||||
//TODO spawn error
|
|
||||||
startUdfd();
|
|
||||||
|
|
||||||
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
|
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
|
||||||
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
|
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
|
||||||
uv_mutex_init(&gUdfTaskQueueMutex);
|
uv_mutex_init(&gUdfTaskQueueMutex);
|
||||||
|
@ -994,7 +991,7 @@ void constructUdfService(void *argsThread) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t startUdfService() {
|
int32_t createUdfdProxy(int32_t dnodeId) {
|
||||||
gUdfcState = UDFC_STATE_STARTNG;
|
gUdfcState = UDFC_STATE_STARTNG;
|
||||||
uv_barrier_init(&gUdfInitBarrier, 2);
|
uv_barrier_init(&gUdfInitBarrier, 2);
|
||||||
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
|
uv_thread_create(&gUdfLoopThread, constructUdfService, 0);
|
||||||
|
@ -1002,12 +999,12 @@ int32_t startUdfService() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stopUdfService() {
|
int32_t destroyUdfdProxy(int32_t dnodeId) {
|
||||||
gUdfcState = UDFC_STATE_STOPPING;
|
gUdfcState = UDFC_STATE_STOPPING;
|
||||||
uv_barrier_destroy(&gUdfInitBarrier);
|
uv_barrier_destroy(&gUdfInitBarrier);
|
||||||
if (gUdfcState == UDFC_STATE_STOPPING) {
|
// if (gUdfcState == UDFC_STATE_STOPPING) {
|
||||||
uv_process_kill(&gUdfdProcess, SIGINT);
|
// uv_process_kill(&gUdfdProcess, SIGINT);
|
||||||
}
|
// }
|
||||||
uv_async_send(&gUdfLoopStopAsync);
|
uv_async_send(&gUdfLoopStopAsync);
|
||||||
uv_thread_join(&gUdfLoopThread);
|
uv_thread_join(&gUdfLoopThread);
|
||||||
uv_mutex_destroy(&gUdfTaskQueueMutex);
|
uv_mutex_destroy(&gUdfTaskQueueMutex);
|
||||||
|
|
|
@ -12,10 +12,10 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "uv.h"
|
#include "uv.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tlog.h"
|
#include "fnLog.h"
|
||||||
|
#include "thash.h"
|
||||||
|
|
||||||
#include "tudf.h"
|
#include "tudf.h"
|
||||||
#include "tudfInt.h"
|
#include "tudfInt.h"
|
||||||
|
@ -25,336 +25,377 @@
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
static uv_loop_t *loop;
|
typedef struct SUdfdContext {
|
||||||
|
uv_loop_t *loop;
|
||||||
|
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
|
||||||
|
void *clientRpc;
|
||||||
|
|
||||||
|
uv_mutex_t udfsMutex;
|
||||||
|
SHashObj* udfsHash;
|
||||||
|
|
||||||
|
bool printVersion;
|
||||||
|
} SUdfdContext;
|
||||||
|
|
||||||
|
SUdfdContext global;
|
||||||
|
|
||||||
typedef struct SUdfdUvConn {
|
typedef struct SUdfdUvConn {
|
||||||
uv_stream_t *client;
|
uv_stream_t *client;
|
||||||
char *inputBuf;
|
char *inputBuf;
|
||||||
int32_t inputLen;
|
int32_t inputLen;
|
||||||
int32_t inputCap;
|
int32_t inputCap;
|
||||||
int32_t inputTotal;
|
int32_t inputTotal;
|
||||||
} SUdfdUvConn;
|
} SUdfdUvConn;
|
||||||
|
|
||||||
typedef struct SUvUdfWork {
|
typedef struct SUvUdfWork {
|
||||||
uv_stream_t *client;
|
uv_stream_t *client;
|
||||||
uv_buf_t input;
|
uv_buf_t input;
|
||||||
uv_buf_t output;
|
uv_buf_t output;
|
||||||
} SUvUdfWork;
|
} SUvUdfWork;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
UDF_STATE_INIT = 0,
|
||||||
|
UDF_STATE_LOADING,
|
||||||
|
UDF_STATE_READY,
|
||||||
|
UDF_STATE_UNLOADING
|
||||||
|
} EUdfState;
|
||||||
|
|
||||||
typedef struct SUdf {
|
typedef struct SUdf {
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
|
EUdfState state;
|
||||||
|
uv_mutex_t lock;
|
||||||
|
uv_cond_t condReady;
|
||||||
|
|
||||||
char name[16];
|
char name[16];
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
char path[PATH_MAX];
|
||||||
|
|
||||||
uv_lib_t lib;
|
uv_lib_t lib;
|
||||||
TUdfScalarProcFunc scalarProcFunc;
|
TUdfScalarProcFunc scalarProcFunc;
|
||||||
TUdfFreeUdfColumnFunc freeUdfColumn;
|
TUdfFreeUdfColumnFunc freeUdfColumn;
|
||||||
} SUdf;
|
} SUdf;
|
||||||
|
|
||||||
//TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
||||||
//TODO: add private udf structure.
|
// TODO: add private udf structure.
|
||||||
typedef struct SUdfHandle {
|
typedef struct SUdfHandle {
|
||||||
SUdf *udf;
|
SUdf *udf;
|
||||||
} SUdfHandle;
|
} SUdfHandle;
|
||||||
|
|
||||||
|
int32_t udfdLoadUdf(char* udfName, SUdf* udf) {
|
||||||
|
strcpy(udf->name, udfName);
|
||||||
|
|
||||||
|
int err = uv_dlopen(udf->path, &udf->lib);
|
||||||
|
if (err != 0) {
|
||||||
|
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||||
|
// TODO set error
|
||||||
|
}
|
||||||
|
//TODO: find all the functions
|
||||||
|
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
|
strcpy(normalFuncName, udfName);
|
||||||
|
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
|
||||||
|
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
||||||
|
char *freeSuffix = "_free";
|
||||||
|
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
|
||||||
|
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
||||||
|
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void udfdProcessRequest(uv_work_t *req) {
|
void udfdProcessRequest(uv_work_t *req) {
|
||||||
SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data);
|
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
|
||||||
SUdfRequest request = {0};
|
SUdfRequest request = {0};
|
||||||
decodeUdfRequest(uvUdf->input.base, &request);
|
decodeUdfRequest(uvUdf->input.base, &request);
|
||||||
|
|
||||||
switch (request.type) {
|
switch (request.type) {
|
||||||
case UDF_TASK_SETUP: {
|
case UDF_TASK_SETUP: {
|
||||||
debugPrint("%s", "process setup request");
|
//TODO: tracable id from client. connect, setup, call, teardown
|
||||||
SUdf *udf = taosMemoryMalloc(sizeof(SUdf));
|
fnInfo("%"PRId64" setup request. udf name: %s", request.seqNum, request.setup.udfName);
|
||||||
udf->refCount = 0;
|
SUdfSetupRequest *setup = &request.setup;
|
||||||
SUdfSetupRequest *setup = &request.setup;
|
|
||||||
strcpy(udf->name, setup->udfName);
|
|
||||||
//TODO: retrive udf info from mnode
|
|
||||||
char* path = "libudf1.so";
|
|
||||||
int err = uv_dlopen(path, &udf->lib);
|
|
||||||
if (err != 0) {
|
|
||||||
debugPrint("can not load library %s. error: %s", path, uv_strerror(err));
|
|
||||||
//TODO set error
|
|
||||||
}
|
|
||||||
|
|
||||||
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
SUdf* udf = NULL;
|
||||||
strcpy(normalFuncName, setup->udfName);
|
uv_mutex_lock(&global.udfsMutex);
|
||||||
//TODO error, multi-thread, same udf, lock it
|
SUdf** udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN);
|
||||||
//TODO find all functions normal, init, destroy, normal, merge, finalize
|
if (*udfInHash) {
|
||||||
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc));
|
++(*udfInHash)->refCount;
|
||||||
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
udf = *udfInHash;
|
||||||
char *freeSuffix = "_free";
|
uv_mutex_unlock(&global.udfsMutex);
|
||||||
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
|
} else {
|
||||||
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||||
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
udfNew->refCount = 1;
|
||||||
|
udfNew->state = UDF_STATE_INIT;
|
||||||
|
|
||||||
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
|
uv_mutex_init(&udfNew->lock);
|
||||||
handle->udf = udf;
|
uv_cond_init(&udfNew->condReady);
|
||||||
udf->refCount++;
|
udf = udfNew;
|
||||||
//TODO: allocate private structure and call init function and set it to handle
|
taosHashPut(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN, &udfNew, sizeof(&udfNew));
|
||||||
SUdfResponse rsp;
|
uv_mutex_unlock(&global.udfsMutex);
|
||||||
rsp.seqNum = request.seqNum;
|
}
|
||||||
rsp.type = request.type;
|
|
||||||
rsp.code = 0;
|
|
||||||
rsp.setupRsp.udfHandle = (int64_t) (handle);
|
|
||||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
|
||||||
rsp.msgLen = len;
|
|
||||||
void *bufBegin = taosMemoryMalloc(len);
|
|
||||||
void *buf = bufBegin;
|
|
||||||
encodeUdfResponse(&buf, &rsp);
|
|
||||||
|
|
||||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
uv_mutex_lock(&udf->lock);
|
||||||
|
if (udf->state == UDF_STATE_INIT) {
|
||||||
taosMemoryFree(uvUdf->input.base);
|
udf->state = UDF_STATE_LOADING;
|
||||||
break;
|
udfdLoadUdf(setup->udfName, udf);
|
||||||
|
udf->state = UDF_STATE_READY;
|
||||||
|
uv_cond_broadcast(&udf->condReady);
|
||||||
|
uv_mutex_unlock(&udf->lock);
|
||||||
|
} else {
|
||||||
|
while (udf->state != UDF_STATE_READY) {
|
||||||
|
uv_cond_wait(&udf->condReady, &udf->lock);
|
||||||
}
|
}
|
||||||
|
uv_mutex_unlock(&udf->lock);
|
||||||
|
}
|
||||||
|
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
|
||||||
|
handle->udf = udf;
|
||||||
|
// TODO: allocate private structure and call init function and set it to handle
|
||||||
|
SUdfResponse rsp;
|
||||||
|
rsp.seqNum = request.seqNum;
|
||||||
|
rsp.type = request.type;
|
||||||
|
rsp.code = 0;
|
||||||
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||||
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||||
|
rsp.msgLen = len;
|
||||||
|
void *bufBegin = taosMemoryMalloc(len);
|
||||||
|
void *buf = bufBegin;
|
||||||
|
encodeUdfResponse(&buf, &rsp);
|
||||||
|
|
||||||
case UDF_TASK_CALL: {
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||||
debugPrint("%s", "process call request");
|
|
||||||
SUdfCallRequest *call = &request.call;
|
|
||||||
SUdfHandle *handle = (SUdfHandle *) (call->udfHandle);
|
|
||||||
SUdf *udf = handle->udf;
|
|
||||||
|
|
||||||
SUdfDataBlock input = {0};
|
|
||||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
|
||||||
SUdfColumn output = {0};
|
|
||||||
//TODO: call different functions according to call type, for now just calar
|
|
||||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
|
||||||
udf->scalarProcFunc(input, &output);
|
|
||||||
}
|
|
||||||
|
|
||||||
SUdfResponse response = {0};
|
|
||||||
SUdfResponse *rsp = &response;
|
|
||||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
|
||||||
rsp->seqNum = request.seqNum;
|
|
||||||
rsp->type = request.type;
|
|
||||||
rsp->code = 0;
|
|
||||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
|
||||||
subRsp->callType = call->callType;
|
|
||||||
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
||||||
rsp->msgLen = len;
|
|
||||||
void *bufBegin = taosMemoryMalloc(len);
|
|
||||||
void *buf = bufBegin;
|
|
||||||
encodeUdfResponse(&buf, rsp);
|
|
||||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
||||||
|
|
||||||
//TODO: free
|
|
||||||
udf->freeUdfColumn(&output);
|
|
||||||
|
|
||||||
taosMemoryFree(uvUdf->input.base);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case UDF_TASK_TEARDOWN: {
|
|
||||||
debugPrint("%s", "process teardown request");
|
|
||||||
|
|
||||||
SUdfTeardownRequest *teardown = &request.teardown;
|
|
||||||
SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle);
|
|
||||||
SUdf *udf = handle->udf;
|
|
||||||
udf->refCount--;
|
|
||||||
if (udf->refCount == 0) {
|
|
||||||
uv_dlclose(&udf->lib);
|
|
||||||
taosMemoryFree(udf);
|
|
||||||
}
|
|
||||||
//TODO: call destroy and free udf private
|
|
||||||
taosMemoryFree(handle);
|
|
||||||
|
|
||||||
SUdfResponse response;
|
|
||||||
SUdfResponse *rsp = &response;
|
|
||||||
rsp->seqNum = request.seqNum;
|
|
||||||
rsp->type = request.type;
|
|
||||||
rsp->code = 0;
|
|
||||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
||||||
rsp->msgLen = len;
|
|
||||||
void *bufBegin = taosMemoryMalloc(len);
|
|
||||||
void *buf = bufBegin;
|
|
||||||
encodeUdfResponse(&buf, rsp);
|
|
||||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
||||||
|
|
||||||
taosMemoryFree(uvUdf->input.base);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default: {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
taosMemoryFree(uvUdf->input.base);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case UDF_TASK_CALL: {
|
||||||
|
SUdfCallRequest *call = &request.call;
|
||||||
|
fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle);
|
||||||
|
SUdfHandle *handle = (SUdfHandle *)(call->udfHandle);
|
||||||
|
SUdf *udf = handle->udf;
|
||||||
|
|
||||||
|
SUdfDataBlock input = {0};
|
||||||
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||||
|
SUdfColumn output = {0};
|
||||||
|
// TODO: call different functions according to call type, for now just calar
|
||||||
|
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
||||||
|
udf->scalarProcFunc(input, &output);
|
||||||
|
}
|
||||||
|
|
||||||
|
SUdfResponse response = {0};
|
||||||
|
SUdfResponse *rsp = &response;
|
||||||
|
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
||||||
|
rsp->seqNum = request.seqNum;
|
||||||
|
rsp->type = request.type;
|
||||||
|
rsp->code = 0;
|
||||||
|
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||||
|
subRsp->callType = call->callType;
|
||||||
|
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||||
|
rsp->msgLen = len;
|
||||||
|
void *bufBegin = taosMemoryMalloc(len);
|
||||||
|
void *buf = bufBegin;
|
||||||
|
encodeUdfResponse(&buf, rsp);
|
||||||
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||||
|
|
||||||
|
// TODO: free udf column
|
||||||
|
udf->freeUdfColumn(&output);
|
||||||
|
|
||||||
|
taosMemoryFree(uvUdf->input.base);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case UDF_TASK_TEARDOWN: {
|
||||||
|
SUdfTeardownRequest *teardown = &request.teardown;
|
||||||
|
fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle)
|
||||||
|
SUdfHandle *handle = (SUdfHandle *)(teardown->udfHandle);
|
||||||
|
SUdf *udf = handle->udf;
|
||||||
|
bool unloadUdf = false;
|
||||||
|
uv_mutex_lock(&global.udfsMutex);
|
||||||
|
udf->refCount--;
|
||||||
|
if (udf->refCount == 0) {
|
||||||
|
unloadUdf = true;
|
||||||
|
taosHashRemove(global.udfsHash, udf->name, TSDB_FUNC_NAME_LEN);
|
||||||
|
}
|
||||||
|
uv_mutex_unlock(&global.udfsMutex);
|
||||||
|
if (unloadUdf) {
|
||||||
|
uv_cond_destroy(&udf->condReady);
|
||||||
|
uv_mutex_destroy(&udf->lock);
|
||||||
|
uv_dlclose(&udf->lib);
|
||||||
|
taosMemoryFree(udf);
|
||||||
|
}
|
||||||
|
// TODO: call destroy and free udf private
|
||||||
|
taosMemoryFree(handle);
|
||||||
|
|
||||||
|
SUdfResponse response;
|
||||||
|
SUdfResponse *rsp = &response;
|
||||||
|
rsp->seqNum = request.seqNum;
|
||||||
|
rsp->type = request.type;
|
||||||
|
rsp->code = 0;
|
||||||
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||||
|
rsp->msgLen = len;
|
||||||
|
void *bufBegin = taosMemoryMalloc(len);
|
||||||
|
void *buf = bufBegin;
|
||||||
|
encodeUdfResponse(&buf, rsp);
|
||||||
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||||
|
|
||||||
|
taosMemoryFree(uvUdf->input.base);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdOnWrite(uv_write_t *req, int status) {
|
void udfdOnWrite(uv_write_t *req, int status) {
|
||||||
debugPrint("%s", "server after writing to pipe");
|
SUvUdfWork *work = (SUvUdfWork *)req->data;
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
debugPrint("Write error %s", uv_err_name(status));
|
//TODO:log error and process it.
|
||||||
}
|
}
|
||||||
SUvUdfWork *work = (SUvUdfWork *) req->data;
|
fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status));
|
||||||
debugPrint("\tlength: %zu", work->output.len);
|
taosMemoryFree(work->output.base);
|
||||||
taosMemoryFree(work->output.base);
|
taosMemoryFree(work);
|
||||||
taosMemoryFree(work);
|
taosMemoryFree(req);
|
||||||
taosMemoryFree(req);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void udfdSendResponse(uv_work_t *work, int status) {
|
void udfdSendResponse(uv_work_t *work, int status) {
|
||||||
debugPrint("%s", "send response");
|
SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
|
||||||
SUvUdfWork *udfWork = (SUvUdfWork *) (work->data);
|
|
||||||
|
|
||||||
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
|
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
|
||||||
write_req->data = udfWork;
|
write_req->data = udfWork;
|
||||||
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
|
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
|
||||||
|
|
||||||
taosMemoryFree(work);
|
taosMemoryFree(work);
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
||||||
debugPrint("%s", "server allocate buffer for read");
|
SUdfdUvConn *ctx = handle->data;
|
||||||
SUdfdUvConn *ctx = handle->data;
|
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
|
||||||
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
|
if (ctx->inputCap == 0) {
|
||||||
if (ctx->inputCap == 0) {
|
ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
|
||||||
ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
|
if (ctx->inputBuf) {
|
||||||
if (ctx->inputBuf) {
|
ctx->inputLen = 0;
|
||||||
ctx->inputLen = 0;
|
ctx->inputCap = msgHeadSize;
|
||||||
ctx->inputCap = msgHeadSize;
|
ctx->inputTotal = -1;
|
||||||
ctx->inputTotal = -1;
|
|
||||||
|
|
||||||
buf->base = ctx->inputBuf;
|
buf->base = ctx->inputBuf;
|
||||||
buf->len = ctx->inputCap;
|
buf->len = ctx->inputCap;
|
||||||
} else {
|
|
||||||
//TODO: log error
|
|
||||||
buf->base = NULL;
|
|
||||||
buf->len = 0;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
|
// TODO: log error
|
||||||
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
|
buf->base = NULL;
|
||||||
if (inputBuf) {
|
buf->len = 0;
|
||||||
ctx->inputBuf = inputBuf;
|
|
||||||
buf->base = ctx->inputBuf + ctx->inputLen;
|
|
||||||
buf->len = ctx->inputCap - ctx->inputLen;
|
|
||||||
} else {
|
|
||||||
//TODO: log error
|
|
||||||
buf->base = NULL;
|
|
||||||
buf->len = 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
debugPrint("\tinput buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
|
} else {
|
||||||
|
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
|
||||||
|
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
|
||||||
|
if (inputBuf) {
|
||||||
|
ctx->inputBuf = inputBuf;
|
||||||
|
buf->base = ctx->inputBuf + ctx->inputLen;
|
||||||
|
buf->len = ctx->inputCap - ctx->inputLen;
|
||||||
|
} else {
|
||||||
|
// TODO: log error
|
||||||
|
buf->base = NULL;
|
||||||
|
buf->len = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
|
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
|
||||||
if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
|
if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
|
||||||
pipe->inputTotal = *(int32_t *) (pipe->inputBuf);
|
pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
|
||||||
}
|
}
|
||||||
if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
|
if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
|
||||||
return true;
|
fnDebug("receive request complete. length %d", pipe->inputLen);
|
||||||
}
|
return true;
|
||||||
return false;
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdHandleRequest(SUdfdUvConn *conn) {
|
void udfdHandleRequest(SUdfdUvConn *conn) {
|
||||||
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
|
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
|
||||||
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
|
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
|
||||||
udfWork->client = conn->client;
|
udfWork->client = conn->client;
|
||||||
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
|
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
|
||||||
conn->inputBuf = NULL;
|
conn->inputBuf = NULL;
|
||||||
conn->inputLen = 0;
|
conn->inputLen = 0;
|
||||||
conn->inputCap = 0;
|
conn->inputCap = 0;
|
||||||
conn->inputTotal = -1;
|
conn->inputTotal = -1;
|
||||||
work->data = udfWork;
|
work->data = udfWork;
|
||||||
uv_queue_work(loop, work, udfdProcessRequest, udfdSendResponse);
|
uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse);
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdPipeCloseCb(uv_handle_t *pipe) {
|
void udfdPipeCloseCb(uv_handle_t *pipe) {
|
||||||
SUdfdUvConn *conn = pipe->data;
|
SUdfdUvConn *conn = pipe->data;
|
||||||
taosMemoryFree(conn->client);
|
taosMemoryFree(conn->client);
|
||||||
taosMemoryFree(conn->inputBuf);
|
taosMemoryFree(conn->inputBuf);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdUvHandleError(SUdfdUvConn *conn) {
|
void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
|
||||||
uv_close((uv_handle_t *) conn->client, udfdPipeCloseCb);
|
|
||||||
}
|
|
||||||
|
|
||||||
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
||||||
debugPrint("%s, nread: %zd", "read from pipe", nread);
|
fnDebug("udf read %zu bytes from client", nread);
|
||||||
|
if (nread == 0) return;
|
||||||
|
|
||||||
if (nread == 0) return;
|
SUdfdUvConn *conn = client->data;
|
||||||
|
|
||||||
SUdfdUvConn *conn = client->data;
|
if (nread > 0) {
|
||||||
|
conn->inputLen += nread;
|
||||||
if (nread > 0) {
|
if (isUdfdUvMsgComplete(conn)) {
|
||||||
conn->inputLen += nread;
|
udfdHandleRequest(conn);
|
||||||
if (isUdfdUvMsgComplete(conn)) {
|
} else {
|
||||||
udfdHandleRequest(conn);
|
// log error or continue;
|
||||||
} else {
|
|
||||||
//log error or continue;
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
debugPrint("Read error %s", uv_err_name(nread));
|
fnDebug("Receive error %s", uv_err_name(nread));
|
||||||
if (nread == UV_EOF) {
|
if (nread == UV_EOF) {
|
||||||
//TODO check more when close
|
// TODO check more when close
|
||||||
} else {
|
} else {
|
||||||
}
|
|
||||||
udfdUvHandleError(conn);
|
|
||||||
}
|
}
|
||||||
|
udfdUvHandleError(conn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void udfdOnNewConnection(uv_stream_t *server, int status) {
|
void udfdOnNewConnection(uv_stream_t *server, int status) {
|
||||||
debugPrint("%s", "on new connection");
|
fnDebug("new connection");
|
||||||
if (status < 0) {
|
if (status < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_pipe_t *client = (uv_pipe_t *) taosMemoryMalloc(sizeof(uv_pipe_t));
|
uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
|
||||||
uv_pipe_init(loop, client, 0);
|
uv_pipe_init(global.loop, client, 0);
|
||||||
if (uv_accept(server, (uv_stream_t *) client) == 0) {
|
if (uv_accept(server, (uv_stream_t *)client) == 0) {
|
||||||
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
|
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
|
||||||
ctx->client = (uv_stream_t *) client;
|
ctx->client = (uv_stream_t *)client;
|
||||||
ctx->inputBuf = 0;
|
ctx->inputBuf = 0;
|
||||||
ctx->inputLen = 0;
|
ctx->inputLen = 0;
|
||||||
ctx->inputCap = 0;
|
ctx->inputCap = 0;
|
||||||
client->data = ctx;
|
client->data = ctx;
|
||||||
ctx->client = (uv_stream_t *) client;
|
ctx->client = (uv_stream_t *)client;
|
||||||
uv_read_start((uv_stream_t *) client, udfdAllocBuffer, udfdPipeRead);
|
uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
|
||||||
} else {
|
} else {
|
||||||
uv_close((uv_handle_t *) client, NULL);
|
uv_close((uv_handle_t *)client, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void removeListeningPipe(int sig) {
|
void removeListeningPipe(int sig) {
|
||||||
uv_fs_t req;
|
uv_fs_t req;
|
||||||
uv_fs_unlink(loop, &req, "udf.sock", NULL);
|
uv_fs_unlink(global.loop, &req, "udf.sock", NULL);
|
||||||
exit(0);
|
exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SServerContext {
|
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
|
||||||
void *clientRpc;
|
|
||||||
} SUdfdContext;
|
|
||||||
|
|
||||||
|
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char* udfName, SUdf* udf) {
|
||||||
void udfdProcessRpcRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t fetchUdfFuncInfo(void *clientRpc, SEpSet* pEpSet, char* udfNames[], int32_t numOfUdfs) {
|
|
||||||
SRetrieveFuncReq retrieveReq = {0};
|
SRetrieveFuncReq retrieveReq = {0};
|
||||||
retrieveReq.numOfFuncs = 1;
|
retrieveReq.numOfFuncs = 1;
|
||||||
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
||||||
for (int32_t i = 0; i < numOfUdfs; ++i) {
|
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
||||||
taosArrayPush(retrieveReq.pFuncNames, udfNames[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
||||||
void* pReq = rpcMallocCont(contLen);
|
void *pReq = rpcMallocCont(contLen);
|
||||||
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
||||||
taosArrayDestroy(retrieveReq.pFuncNames);
|
taosArrayDestroy(retrieveReq.pFuncNames);
|
||||||
|
|
||||||
|
@ -368,66 +409,176 @@ int32_t fetchUdfFuncInfo(void *clientRpc, SEpSet* pEpSet, char* udfNames[], int3
|
||||||
SRetrieveFuncRsp retrieveRsp = {0};
|
SRetrieveFuncRsp retrieveRsp = {0};
|
||||||
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
|
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
|
||||||
|
|
||||||
SFuncInfo* pFuncInfo = (SFuncInfo*)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
||||||
|
|
||||||
|
char path[PATH_MAX] = {0};
|
||||||
|
taosGetTmpfilePath("/tmp", "libudf", path);
|
||||||
|
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
||||||
|
// TODO check for failure of flush to disk
|
||||||
|
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
||||||
|
taosCloseFile(&file);
|
||||||
|
strncpy(udf->path, path, strlen(path));
|
||||||
taosArrayDestroy(retrieveRsp.pFuncInfos);
|
taosArrayDestroy(retrieveRsp.pFuncInfos);
|
||||||
|
|
||||||
rpcFreeCont(rpcRsp.pCont);
|
rpcFreeCont(rpcRsp.pCont);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t openUdfdClientRpc(SUdfdContext *ctx) {
|
int32_t udfdOpenClientRpc() {
|
||||||
char *pass = "taosdata";
|
char *pass = "taosdata";
|
||||||
char *user = "root";
|
char *user = "root";
|
||||||
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
|
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
|
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
|
||||||
SRpcInit rpcInit = {0};
|
SRpcInit rpcInit = {0};
|
||||||
rpcInit.label = (char*)"UDFD";
|
rpcInit.label = (char *)"UDFD";
|
||||||
rpcInit.numOfThreads = 1;
|
rpcInit.numOfThreads = 1;
|
||||||
rpcInit.cfp = udfdProcessRpcRsp;
|
rpcInit.cfp = udfdProcessRpcRsp;
|
||||||
rpcInit.sessions = 1024;
|
rpcInit.sessions = 1024;
|
||||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
rpcInit.idleTime = 30 * 1000;
|
rpcInit.idleTime = 30 * 1000;
|
||||||
rpcInit.parent = ctx;
|
rpcInit.parent = &global;
|
||||||
|
|
||||||
rpcInit.user = (char*)user;
|
rpcInit.user = (char *)user;
|
||||||
rpcInit.ckey = (char*)"key";
|
rpcInit.ckey = (char *)"key";
|
||||||
rpcInit.secret = (char*)secretEncrypt;
|
rpcInit.secret = (char *)secretEncrypt;
|
||||||
rpcInit.spi = 1;
|
rpcInit.spi = 1;
|
||||||
|
|
||||||
ctx->clientRpc = rpcOpen(&rpcInit);
|
global.clientRpc = rpcOpen(&rpcInit);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t closeUdfdClientRpc(SUdfdContext *ctx) {
|
int32_t udfdCloseClientRpc() {
|
||||||
rpcClose(ctx->clientRpc);
|
rpcClose(global.clientRpc);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void udfdPrintVersion() {
|
||||||
|
#ifdef TD_ENTERPRISE
|
||||||
|
char *releaseName = "enterprise";
|
||||||
|
#else
|
||||||
|
char *releaseName = "community";
|
||||||
|
#endif
|
||||||
|
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
|
||||||
|
printf("gitinfo: %s\n", gitinfo);
|
||||||
|
printf("buildInfo: %s\n", buildinfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
|
||||||
|
for (int32_t i = 1; i < argc; ++i) {
|
||||||
|
if (strcmp(argv[i], "-c") == 0) {
|
||||||
|
if (i < argc - 1) {
|
||||||
|
if (strlen(argv[++i]) >= PATH_MAX) {
|
||||||
|
printf("config file path overflow");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tstrncpy(configDir, argv[i], PATH_MAX);
|
||||||
|
} else {
|
||||||
|
printf("'-c' requires a parameter, default is %s\n", configDir);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else if (strcmp(argv[i], "-V") == 0) {
|
||||||
|
global.printVersion = true;
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int main() {
|
static int32_t udfdInitLog() {
|
||||||
debugPrint("libuv version: %x", UV_VERSION_HEX);
|
char logName[12] = {0};
|
||||||
|
snprintf(logName, sizeof(logName), "%slog", "udfd");
|
||||||
loop = uv_default_loop();
|
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0);
|
||||||
uv_fs_t req;
|
}
|
||||||
uv_fs_unlink(loop, &req, "udf.sock", NULL);
|
|
||||||
|
static int32_t udfdUvInit() {
|
||||||
uv_pipe_t server;
|
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
uv_pipe_init(loop, &server, 0);
|
if (loop) {
|
||||||
|
uv_loop_init(loop);
|
||||||
signal(SIGINT, removeListeningPipe);
|
}
|
||||||
|
global.loop = loop;
|
||||||
int r;
|
char dnodeId[8] = {0};
|
||||||
if ((r = uv_pipe_bind(&server, "udf.sock"))) {
|
size_t dnodeIdSize;
|
||||||
debugPrint("Bind error %s\n", uv_err_name(r));
|
uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
|
||||||
removeListeningPipe(0);
|
char listenPipeName[32] = {0};
|
||||||
return 1;
|
snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
|
||||||
}
|
strcpy(global.listenPipeName, listenPipeName);
|
||||||
if ((r = uv_listen((uv_stream_t *) &server, 128, udfdOnNewConnection))) {
|
|
||||||
debugPrint("Listen error %s", uv_err_name(r));
|
uv_fs_t req;
|
||||||
return 2;
|
uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
|
||||||
}
|
|
||||||
uv_run(loop, UV_RUN_DEFAULT);
|
uv_pipe_t server;
|
||||||
uv_loop_close(loop);
|
uv_pipe_init(global.loop, &server, 0);
|
||||||
|
|
||||||
|
signal(SIGINT, removeListeningPipe);
|
||||||
|
|
||||||
|
int r;
|
||||||
|
fnInfo("bind to pipe %s", global.listenPipeName);
|
||||||
|
if ((r = uv_pipe_bind(&server, listenPipeName))) {
|
||||||
|
fnError("Bind error %s", uv_err_name(r));
|
||||||
|
removeListeningPipe(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if ((r = uv_listen((uv_stream_t *)&server, 128, udfdOnNewConnection))) {
|
||||||
|
fnError("Listen error %s", uv_err_name(r));
|
||||||
|
removeListeningPipe(0);
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t udfdRun() {
|
||||||
|
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
uv_mutex_init(&global.udfsMutex);
|
||||||
|
|
||||||
|
//TOOD: client rpc to fetch udf function info from mnode
|
||||||
|
if (udfdOpenClientRpc() != 0) {
|
||||||
|
fnError("open rpc connection to mnode failure");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (udfdUvInit() != 0) {
|
||||||
|
fnError("uv init failure");
|
||||||
|
return -2;
|
||||||
|
}
|
||||||
|
|
||||||
|
fnInfo("start the udfd");
|
||||||
|
int code = uv_run(global.loop, UV_RUN_DEFAULT);
|
||||||
|
fnInfo("udfd stopped. result: %s", uv_err_name(code));
|
||||||
|
int codeClose = uv_loop_close(global.loop);
|
||||||
|
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
|
||||||
|
udfdCloseClientRpc();
|
||||||
|
uv_mutex_destroy(&global.udfsMutex);
|
||||||
|
taosHashCleanup(global.udfsHash);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
if (!taosCheckSystemIsSmallEnd()) {
|
||||||
|
printf("failed to start since on non-small-end machines\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (udfdParseArgs(argc, argv) != 0) {
|
||||||
|
printf("failed to start since parse args error\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (global.printVersion) {
|
||||||
|
udfdPrintVersion();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (udfdInitLog() != 0) {
|
||||||
|
printf("failed to start since init log error\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosInitCfg(configDir, NULL, NULL, NULL, 0) != 0) {
|
||||||
|
fnError("failed to start since read config error");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return udfdRun();
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
startUdfService();
|
createUdfdProxy(1);
|
||||||
uv_sleep(1000);
|
uv_sleep(1000);
|
||||||
char path[256] = {0};
|
char path[256] = {0};
|
||||||
size_t cwdSize = 256;
|
size_t cwdSize = 256;
|
||||||
|
@ -53,5 +53,5 @@ int main(int argc, char *argv[]) {
|
||||||
}
|
}
|
||||||
teardownUdf(handle);
|
teardownUdf(handle);
|
||||||
|
|
||||||
stopUdfService();
|
destroyUdfdProxy(1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,6 +91,7 @@ int32_t sDebugFlag = 135;
|
||||||
int32_t tsdbDebugFlag = 131;
|
int32_t tsdbDebugFlag = 131;
|
||||||
int32_t tqDebugFlag = 135;
|
int32_t tqDebugFlag = 135;
|
||||||
int32_t fsDebugFlag = 135;
|
int32_t fsDebugFlag = 135;
|
||||||
|
int32_t fnDebugFlag = 135;
|
||||||
|
|
||||||
int64_t dbgEmptyW = 0;
|
int64_t dbgEmptyW = 0;
|
||||||
int64_t dbgWN = 0;
|
int64_t dbgWN = 0;
|
||||||
|
@ -752,6 +753,7 @@ void taosSetAllDebugFlag(int32_t flag) {
|
||||||
tsdbDebugFlag = flag;
|
tsdbDebugFlag = flag;
|
||||||
tqDebugFlag = flag;
|
tqDebugFlag = flag;
|
||||||
fsDebugFlag = flag;
|
fsDebugFlag = flag;
|
||||||
|
fnDebugFlag = flag;
|
||||||
|
|
||||||
uInfo("all debug flag are set to %d", flag);
|
uInfo("all debug flag are set to %d", flag);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue