947 lines
28 KiB
C
947 lines
28 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#include "uv.h"
|
|
#include "os.h"
|
|
#include "fnLog.h"
|
|
#include "thash.h"
|
|
|
|
#include "tudf.h"
|
|
#include "tudfInt.h"
|
|
|
|
#include "tdatablock.h"
|
|
#include "tdataformat.h"
|
|
#include "tglobal.h"
|
|
#include "tmsg.h"
|
|
#include "trpc.h"
|
|
|
|
typedef struct SUdfdContext {
|
|
uv_loop_t * loop;
|
|
uv_pipe_t ctrlPipe;
|
|
uv_signal_t intrSignal;
|
|
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
|
|
uv_pipe_t listeningPipe;
|
|
|
|
void * clientRpc;
|
|
SCorEpSet mgmtEp;
|
|
uv_mutex_t udfsMutex;
|
|
SHashObj * udfsHash;
|
|
|
|
bool printVersion;
|
|
} SUdfdContext;
|
|
|
|
SUdfdContext global;
|
|
|
|
typedef struct SUdfdUvConn {
|
|
uv_stream_t *client;
|
|
char * inputBuf;
|
|
int32_t inputLen;
|
|
int32_t inputCap;
|
|
int32_t inputTotal;
|
|
} SUdfdUvConn;
|
|
|
|
typedef struct SUvUdfWork {
|
|
uv_stream_t *client;
|
|
uv_buf_t input;
|
|
uv_buf_t output;
|
|
} SUvUdfWork;
|
|
|
|
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
|
|
|
|
typedef struct SUdf {
|
|
int32_t refCount;
|
|
EUdfState state;
|
|
uv_mutex_t lock;
|
|
uv_cond_t condReady;
|
|
|
|
char name[TSDB_FUNC_NAME_LEN];
|
|
int8_t funcType;
|
|
int8_t scriptType;
|
|
int8_t outputType;
|
|
int32_t outputLen;
|
|
int32_t bufSize;
|
|
|
|
char path[PATH_MAX];
|
|
|
|
uv_lib_t lib;
|
|
|
|
TUdfScalarProcFunc scalarProcFunc;
|
|
|
|
TUdfAggStartFunc aggStartFunc;
|
|
TUdfAggProcessFunc aggProcFunc;
|
|
TUdfAggFinishFunc aggFinishFunc;
|
|
|
|
TUdfInitFunc initFunc;
|
|
TUdfDestroyFunc destroyFunc;
|
|
} SUdf;
|
|
|
|
// TODO: add private udf structure.
|
|
typedef struct SUdfcFuncHandle {
|
|
SUdf *udf;
|
|
} SUdfcFuncHandle;
|
|
|
|
typedef enum EUdfdRpcReqRspType {
|
|
UDFD_RPC_MNODE_CONNECT = 0,
|
|
UDFD_RPC_RETRIVE_FUNC,
|
|
} EUdfdRpcReqRspType;
|
|
|
|
typedef struct SUdfdRpcSendRecvInfo {
|
|
EUdfdRpcReqRspType rpcType;
|
|
int32_t code;
|
|
void * param;
|
|
uv_sem_t resultSem;
|
|
} SUdfdRpcSendRecvInfo;
|
|
|
|
static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
|
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
|
|
static int32_t udfdConnectToMnode();
|
|
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
|
|
static bool udfdRpcRfp(int32_t code);
|
|
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
|
|
static int32_t udfdOpenClientRpc();
|
|
static int32_t udfdCloseClientRpc();
|
|
|
|
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
|
|
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
|
|
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
|
|
static void udfdProcessRequest(uv_work_t *req);
|
|
static void udfdOnWrite(uv_write_t *req, int status);
|
|
static void udfdSendResponse(uv_work_t *work, int status);
|
|
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
|
|
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
|
|
static void udfdHandleRequest(SUdfdUvConn *conn);
|
|
static void udfdPipeCloseCb(uv_handle_t *pipe);
|
|
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
|
|
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
|
|
static void udfdOnNewConnection(uv_stream_t *server, int status);
|
|
|
|
static void udfdIntrSignalHandler(uv_signal_t *handle, int signum);
|
|
static int32_t removeListeningPipe();
|
|
|
|
static void udfdPrintVersion();
|
|
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
|
|
static int32_t udfdInitLog();
|
|
|
|
static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
|
|
static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
|
|
static int32_t udfdUvInit();
|
|
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
|
|
static int32_t udfdRun();
|
|
static void udfdConnectMnodeThreadFunc(void* args);
|
|
|
|
void udfdProcessRequest(uv_work_t *req) {
|
|
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
|
|
SUdfRequest request = {0};
|
|
decodeUdfRequest(uvUdf->input.base, &request);
|
|
|
|
switch (request.type) {
|
|
case UDF_TASK_SETUP: {
|
|
udfdProcessSetupRequest(uvUdf, &request);
|
|
break;
|
|
}
|
|
|
|
case UDF_TASK_CALL: {
|
|
udfdProcessCallRequest(uvUdf, &request);
|
|
break;
|
|
}
|
|
case UDF_TASK_TEARDOWN: {
|
|
udfdProcessTeardownRequest(uvUdf, &request);
|
|
break;
|
|
}
|
|
default: {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
|
// TODO: tracable id from client. connect, setup, call, teardown
|
|
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
|
SUdfSetupRequest *setup = &request->setup;
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
SUdf * udf = NULL;
|
|
uv_mutex_lock(&global.udfsMutex);
|
|
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
|
|
if (udfInHash) {
|
|
++(*udfInHash)->refCount;
|
|
udf = *udfInHash;
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
} else {
|
|
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
|
udfNew->refCount = 1;
|
|
udfNew->state = UDF_STATE_INIT;
|
|
|
|
uv_mutex_init(&udfNew->lock);
|
|
uv_cond_init(&udfNew->condReady);
|
|
udf = udfNew;
|
|
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew));
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
}
|
|
|
|
uv_mutex_lock(&udf->lock);
|
|
if (udf->state == UDF_STATE_INIT) {
|
|
udf->state = UDF_STATE_LOADING;
|
|
code = udfdLoadUdf(setup->udfName, udf);
|
|
if (udf->initFunc) {
|
|
udf->initFunc();
|
|
}
|
|
udf->state = UDF_STATE_READY;
|
|
uv_cond_broadcast(&udf->condReady);
|
|
uv_mutex_unlock(&udf->lock);
|
|
} else {
|
|
while (udf->state != UDF_STATE_READY) {
|
|
uv_cond_wait(&udf->condReady, &udf->lock);
|
|
}
|
|
uv_mutex_unlock(&udf->lock);
|
|
}
|
|
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
|
|
handle->udf = udf;
|
|
|
|
SUdfResponse rsp;
|
|
rsp.seqNum = request->seqNum;
|
|
rsp.type = request->type;
|
|
rsp.code = code;
|
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
|
rsp.setupRsp.outputType = udf->outputType;
|
|
rsp.setupRsp.outputLen = udf->outputLen;
|
|
rsp.setupRsp.bufSize = udf->bufSize;
|
|
|
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
|
rsp.msgLen = len;
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
void *buf = bufBegin;
|
|
encodeUdfResponse(&buf, &rsp);
|
|
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
return;
|
|
}
|
|
|
|
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
|
SUdfCallRequest *call = &request->call;
|
|
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle);
|
|
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
|
|
SUdf * udf = handle->udf;
|
|
SUdfResponse response = {0};
|
|
SUdfResponse * rsp = &response;
|
|
SUdfCallResponse *subRsp = &rsp->callRsp;
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
switch (call->callType) {
|
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
|
SUdfColumn output = {0};
|
|
|
|
SUdfDataBlock input = {0};
|
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
|
code = udf->scalarProcFunc(&input, &output);
|
|
freeUdfDataDataBlock(&input);
|
|
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
|
freeUdfColumn(&output);
|
|
break;
|
|
}
|
|
case TSDB_UDF_CALL_AGG_INIT: {
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
|
udf->aggStartFunc(&outBuf);
|
|
subRsp->resultBuf = outBuf;
|
|
break;
|
|
}
|
|
case TSDB_UDF_CALL_AGG_PROC: {
|
|
SUdfDataBlock input = {0};
|
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
|
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
|
|
freeUdfInterBuf(&call->interBuf);
|
|
freeUdfDataDataBlock(&input);
|
|
subRsp->resultBuf = outBuf;
|
|
|
|
break;
|
|
}
|
|
case TSDB_UDF_CALL_AGG_FIN: {
|
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
|
|
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
|
|
freeUdfInterBuf(&call->interBuf);
|
|
subRsp->resultBuf = outBuf;
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
rsp->seqNum = request->seqNum;
|
|
rsp->type = request->type;
|
|
rsp->code = code;
|
|
subRsp->callType = call->callType;
|
|
|
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
rsp->msgLen = len;
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
void *buf = bufBegin;
|
|
encodeUdfResponse(&buf, rsp);
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
switch (call->callType) {
|
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
|
tDeleteSSDataBlock(&call->block);
|
|
tDeleteSSDataBlock(&subRsp->resultData);
|
|
break;
|
|
}
|
|
case TSDB_UDF_CALL_AGG_INIT: {
|
|
freeUdfInterBuf(&subRsp->resultBuf);
|
|
break;
|
|
}
|
|
case TSDB_UDF_CALL_AGG_PROC: {
|
|
tDeleteSSDataBlock(&call->block);
|
|
freeUdfInterBuf(&subRsp->resultBuf);
|
|
break;
|
|
}
|
|
case TSDB_UDF_CALL_AGG_FIN: {
|
|
freeUdfInterBuf(&subRsp->resultBuf);
|
|
break;
|
|
}
|
|
default:
|
|
break;
|
|
}
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
return;
|
|
}
|
|
|
|
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
|
SUdfTeardownRequest *teardown = &request->teardown;
|
|
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
|
SUdf * udf = handle->udf;
|
|
bool unloadUdf = false;
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
uv_mutex_lock(&global.udfsMutex);
|
|
udf->refCount--;
|
|
if (udf->refCount == 0) {
|
|
unloadUdf = true;
|
|
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
|
|
}
|
|
uv_mutex_unlock(&global.udfsMutex);
|
|
if (unloadUdf) {
|
|
uv_cond_destroy(&udf->condReady);
|
|
uv_mutex_destroy(&udf->lock);
|
|
if (udf->destroyFunc) {
|
|
(udf->destroyFunc)();
|
|
}
|
|
uv_dlclose(&udf->lib);
|
|
taosMemoryFree(udf);
|
|
}
|
|
taosMemoryFree(handle);
|
|
|
|
SUdfResponse response;
|
|
SUdfResponse *rsp = &response;
|
|
rsp->seqNum = request->seqNum;
|
|
rsp->type = request->type;
|
|
rsp->code = code;
|
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
|
rsp->msgLen = len;
|
|
void *bufBegin = taosMemoryMalloc(len);
|
|
void *buf = bufBegin;
|
|
encodeUdfResponse(&buf, rsp);
|
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
|
|
|
taosMemoryFree(uvUdf->input.base);
|
|
return;
|
|
}
|
|
|
|
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
|
|
ASSERT(pMsg->info.ahandle != NULL);
|
|
|
|
if (pEpSet) {
|
|
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
|
|
updateEpSet_s(&global.mgmtEp, pEpSet);
|
|
}
|
|
}
|
|
|
|
if (pMsg->code != TSDB_CODE_SUCCESS) {
|
|
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
|
|
msgInfo->code = pMsg->code;
|
|
goto _return;
|
|
}
|
|
|
|
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
|
|
SConnectRsp connectRsp = {0};
|
|
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
|
|
if (connectRsp.epSet.numOfEps == 0) {
|
|
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
|
|
goto _return;
|
|
}
|
|
|
|
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
|
|
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
|
|
}
|
|
msgInfo->code = 0;
|
|
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
|
|
SRetrieveFuncRsp retrieveRsp = {0};
|
|
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
|
|
|
|
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
|
|
SUdf * udf = msgInfo->param;
|
|
udf->funcType = pFuncInfo->funcType;
|
|
udf->scriptType = pFuncInfo->scriptType;
|
|
udf->outputType = pFuncInfo->outputType;
|
|
udf->outputLen = pFuncInfo->outputLen;
|
|
udf->bufSize = pFuncInfo->bufSize;
|
|
|
|
char path[PATH_MAX] = {0};
|
|
#ifdef WINDOWS
|
|
snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name);
|
|
#else
|
|
snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name);
|
|
#endif
|
|
TdFilePtr file =
|
|
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
|
if (file == NULL) {
|
|
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
|
|
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
|
|
}
|
|
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
|
|
if (count != pFuncInfo->codeSize) {
|
|
fnError("udfd write udf shared library failed");
|
|
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
|
|
}
|
|
taosCloseFile(&file);
|
|
strncpy(udf->path, path, strlen(path));
|
|
tFreeSFuncInfo(pFuncInfo);
|
|
taosArrayDestroy(retrieveRsp.pFuncInfos);
|
|
msgInfo->code = 0;
|
|
}
|
|
|
|
_return:
|
|
rpcFreeCont(pMsg->pCont);
|
|
uv_sem_post(&msgInfo->resultSem);
|
|
return;
|
|
}
|
|
|
|
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
|
SRetrieveFuncReq retrieveReq = {0};
|
|
retrieveReq.numOfFuncs = 1;
|
|
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
|
taosArrayPush(retrieveReq.pFuncNames, udfName);
|
|
|
|
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
|
|
void * pReq = rpcMallocCont(contLen);
|
|
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
|
|
taosArrayDestroy(retrieveReq.pFuncNames);
|
|
|
|
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
|
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
|
|
msgInfo->param = udf;
|
|
uv_sem_init(&msgInfo->resultSem, 0);
|
|
|
|
SRpcMsg rpcMsg = {0};
|
|
rpcMsg.pCont = pReq;
|
|
rpcMsg.contLen = contLen;
|
|
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
|
|
rpcMsg.info.ahandle = msgInfo;
|
|
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
|
|
|
uv_sem_wait(&msgInfo->resultSem);
|
|
uv_sem_destroy(&msgInfo->resultSem);
|
|
int32_t code = msgInfo->code;
|
|
taosMemoryFree(msgInfo);
|
|
return code;
|
|
}
|
|
|
|
int32_t udfdConnectToMnode() {
|
|
SConnectReq connReq = {0};
|
|
connReq.connType = CONN_TYPE__UDFD;
|
|
tstrncpy(connReq.app, "udfd", sizeof(connReq.app));
|
|
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
|
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
|
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
|
|
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
|
|
connReq.pid = htonl(taosGetPId());
|
|
connReq.startTime = htobe64(taosGetTimestampMs());
|
|
|
|
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
|
|
void * pReq = rpcMallocCont(contLen);
|
|
tSerializeSConnectReq(pReq, contLen, &connReq);
|
|
|
|
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
|
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
|
|
uv_sem_init(&msgInfo->resultSem, 0);
|
|
|
|
SRpcMsg rpcMsg = {0};
|
|
rpcMsg.msgType = TDMT_MND_CONNECT;
|
|
rpcMsg.pCont = pReq;
|
|
rpcMsg.contLen = contLen;
|
|
rpcMsg.info.ahandle = msgInfo;
|
|
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
|
|
|
|
uv_sem_wait(&msgInfo->resultSem);
|
|
int32_t code = msgInfo->code;
|
|
uv_sem_destroy(&msgInfo->resultSem);
|
|
taosMemoryFree(msgInfo);
|
|
return code;
|
|
}
|
|
|
|
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|
strcpy(udf->name, udfName);
|
|
int32_t err = 0;
|
|
|
|
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
|
|
if (err != 0) {
|
|
fnError("can not retrieve udf from mnode. udf name %s", udfName);
|
|
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
|
}
|
|
|
|
err = uv_dlopen(udf->path, &udf->lib);
|
|
if (err != 0) {
|
|
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
|
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
|
|
}
|
|
|
|
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
|
|
char *initSuffix = "_init";
|
|
strcpy(initFuncName, udfName);
|
|
strncat(initFuncName, initSuffix, strlen(initSuffix));
|
|
uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc));
|
|
|
|
char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
|
|
char *destroySuffix = "_destroy";
|
|
strcpy(destroyFuncName, udfName);
|
|
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
|
|
uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc));
|
|
|
|
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
|
|
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
|
strcpy(processFuncName, udfName);
|
|
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
|
|
} else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
|
|
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
|
strcpy(processFuncName, udfName);
|
|
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc));
|
|
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
|
char *startSuffix = "_start";
|
|
strncpy(startFuncName, processFuncName, strlen(processFuncName));
|
|
strncat(startFuncName, startSuffix, strlen(startSuffix));
|
|
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
|
|
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
|
|
char *finishSuffix = "_finish";
|
|
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
|
|
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
|
|
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
|
|
// TODO: merge
|
|
}
|
|
return 0;
|
|
}
|
|
static bool udfdRpcRfp(int32_t code) {
|
|
if (code == TSDB_CODE_RPC_REDIRECT) {
|
|
return true;
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
|
|
pEpSet->version = 0;
|
|
|
|
// init mnode ip set
|
|
SEpSet *mgmtEpSet = &(pEpSet->epSet);
|
|
mgmtEpSet->numOfEps = 0;
|
|
mgmtEpSet->inUse = 0;
|
|
|
|
if (firstEp && firstEp[0] != 0) {
|
|
if (strlen(firstEp) >= TSDB_EP_LEN) {
|
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
|
return -1;
|
|
}
|
|
|
|
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
|
return terrno;
|
|
}
|
|
|
|
mgmtEpSet->numOfEps++;
|
|
}
|
|
|
|
if (secondEp && secondEp[0] != 0) {
|
|
if (strlen(secondEp) >= TSDB_EP_LEN) {
|
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
|
return -1;
|
|
}
|
|
|
|
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
|
|
mgmtEpSet->numOfEps++;
|
|
}
|
|
|
|
if (mgmtEpSet->numOfEps == 0) {
|
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int32_t udfdOpenClientRpc() {
|
|
SRpcInit rpcInit = {0};
|
|
rpcInit.label = "UDFD";
|
|
rpcInit.numOfThreads = 1;
|
|
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
|
|
rpcInit.sessions = 1024;
|
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
|
rpcInit.user = TSDB_DEFAULT_USER;
|
|
rpcInit.parent = &global;
|
|
rpcInit.rfp = udfdRpcRfp;
|
|
|
|
global.clientRpc = rpcOpen(&rpcInit);
|
|
if (global.clientRpc == NULL) {
|
|
fnError("failed to init dnode rpc client");
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int32_t udfdCloseClientRpc() {
|
|
rpcClose(global.clientRpc);
|
|
return 0;
|
|
}
|
|
|
|
void udfdOnWrite(uv_write_t *req, int status) {
|
|
SUvUdfWork *work = (SUvUdfWork *)req->data;
|
|
if (status < 0) {
|
|
fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
|
|
}
|
|
taosMemoryFree(work->output.base);
|
|
taosMemoryFree(work);
|
|
taosMemoryFree(req);
|
|
}
|
|
|
|
void udfdSendResponse(uv_work_t *work, int status) {
|
|
SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
|
|
|
|
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
|
|
write_req->data = udfWork;
|
|
uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite);
|
|
|
|
taosMemoryFree(work);
|
|
}
|
|
|
|
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
|
SUdfdUvConn *ctx = handle->data;
|
|
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
|
|
if (ctx->inputCap == 0) {
|
|
ctx->inputBuf = taosMemoryMalloc(msgHeadSize);
|
|
if (ctx->inputBuf) {
|
|
ctx->inputLen = 0;
|
|
ctx->inputCap = msgHeadSize;
|
|
ctx->inputTotal = -1;
|
|
|
|
buf->base = ctx->inputBuf;
|
|
buf->len = ctx->inputCap;
|
|
} else {
|
|
fnError("udfd can not allocate enough memory")
|
|
buf->base = NULL;
|
|
buf->len = 0;
|
|
}
|
|
} else {
|
|
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
|
|
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
|
|
if (inputBuf) {
|
|
ctx->inputBuf = inputBuf;
|
|
buf->base = ctx->inputBuf + ctx->inputLen;
|
|
buf->len = ctx->inputCap - ctx->inputLen;
|
|
} else {
|
|
fnError("udfd can not allocate enough memory")
|
|
buf->base = NULL;
|
|
buf->len = 0;
|
|
}
|
|
}
|
|
fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal);
|
|
}
|
|
|
|
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
|
|
if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
|
|
pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
|
|
}
|
|
if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) {
|
|
fnDebug("receive request complete. length %d", pipe->inputLen);
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void udfdHandleRequest(SUdfdUvConn *conn) {
|
|
uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t));
|
|
SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork));
|
|
udfWork->client = conn->client;
|
|
udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen);
|
|
conn->inputBuf = NULL;
|
|
conn->inputLen = 0;
|
|
conn->inputCap = 0;
|
|
conn->inputTotal = -1;
|
|
work->data = udfWork;
|
|
uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse);
|
|
}
|
|
|
|
void udfdPipeCloseCb(uv_handle_t *pipe) {
|
|
SUdfdUvConn *conn = pipe->data;
|
|
taosMemoryFree(conn->client);
|
|
taosMemoryFree(conn->inputBuf);
|
|
taosMemoryFree(conn);
|
|
}
|
|
|
|
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
|
fnDebug("udf read %zd bytes from client", nread);
|
|
if (nread == 0) return;
|
|
|
|
SUdfdUvConn *conn = client->data;
|
|
|
|
if (nread > 0) {
|
|
conn->inputLen += nread;
|
|
if (isUdfdUvMsgComplete(conn)) {
|
|
udfdHandleRequest(conn);
|
|
} else {
|
|
// log error or continue;
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (nread < 0) {
|
|
fnError("Receive error %s", uv_err_name(nread));
|
|
if (nread == UV_EOF) {
|
|
// TODO check more when close
|
|
} else {
|
|
}
|
|
udfdUvHandleError(conn);
|
|
}
|
|
}
|
|
|
|
void udfdOnNewConnection(uv_stream_t *server, int status) {
|
|
if (status < 0) {
|
|
fnError("udfd new connection error. code: %s", uv_strerror(status));
|
|
return;
|
|
}
|
|
|
|
uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
|
|
uv_pipe_init(global.loop, client, 0);
|
|
if (uv_accept(server, (uv_stream_t *)client) == 0) {
|
|
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
|
|
ctx->client = (uv_stream_t *)client;
|
|
ctx->inputBuf = 0;
|
|
ctx->inputLen = 0;
|
|
ctx->inputCap = 0;
|
|
client->data = ctx;
|
|
ctx->client = (uv_stream_t *)client;
|
|
uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
|
|
} else {
|
|
uv_close((uv_handle_t *)client, NULL);
|
|
}
|
|
}
|
|
|
|
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
|
|
fnInfo("udfd signal received: %d\n", signum);
|
|
uv_fs_t req;
|
|
uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
|
|
uv_signal_stop(handle);
|
|
uv_stop(global.loop);
|
|
}
|
|
|
|
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
|
|
for (int32_t i = 1; i < argc; ++i) {
|
|
if (strcmp(argv[i], "-c") == 0) {
|
|
if (i < argc - 1) {
|
|
if (strlen(argv[++i]) >= PATH_MAX) {
|
|
printf("config file path overflow");
|
|
return -1;
|
|
}
|
|
tstrncpy(configDir, argv[i], PATH_MAX);
|
|
} else {
|
|
printf("'-c' requires a parameter, default is %s\n", configDir);
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-V") == 0) {
|
|
global.printVersion = true;
|
|
} else {
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void udfdPrintVersion() {
|
|
#ifdef TD_ENTERPRISE
|
|
char *releaseName = "enterprise";
|
|
#else
|
|
char *releaseName = "community";
|
|
#endif
|
|
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
|
|
printf("gitinfo: %s\n", gitinfo);
|
|
printf("buildInfo: %s\n", buildinfo);
|
|
}
|
|
|
|
static int32_t udfdInitLog() {
|
|
char logName[12] = {0};
|
|
snprintf(logName, sizeof(logName), "%slog", "udfd");
|
|
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0);
|
|
}
|
|
|
|
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
|
|
buf->base = taosMemoryMalloc(suggested_size);
|
|
buf->len = suggested_size;
|
|
}
|
|
|
|
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
|
|
if (nread < 0) {
|
|
fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
|
|
uv_close((uv_handle_t *)q, NULL);
|
|
uv_stop(global.loop);
|
|
return;
|
|
}
|
|
fnError("udfd ctrl pipe read %zu bytes", nread);
|
|
taosMemoryFree(buf->base);
|
|
}
|
|
|
|
static int32_t removeListeningPipe() {
|
|
uv_fs_t req;
|
|
int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
|
|
uv_fs_req_cleanup(&req);
|
|
return err;
|
|
}
|
|
|
|
static int32_t udfdUvInit() {
|
|
uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
|
if (loop) {
|
|
uv_loop_init(loop);
|
|
}
|
|
global.loop = loop;
|
|
|
|
uv_pipe_init(global.loop, &global.ctrlPipe, 1);
|
|
uv_pipe_open(&global.ctrlPipe, 0);
|
|
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
|
|
|
|
getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
|
|
|
|
removeListeningPipe();
|
|
|
|
uv_pipe_init(global.loop, &global.listeningPipe, 0);
|
|
|
|
uv_signal_init(global.loop, &global.intrSignal);
|
|
uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT);
|
|
|
|
int r;
|
|
fnInfo("bind to pipe %s", global.listenPipeName);
|
|
if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
|
|
fnError("Bind error %s", uv_err_name(r));
|
|
removeListeningPipe();
|
|
return -1;
|
|
}
|
|
if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
|
|
fnError("Listen error %s", uv_err_name(r));
|
|
removeListeningPipe();
|
|
return -2;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
|
|
if (!uv_is_closing(handle)) {
|
|
uv_close(handle, NULL);
|
|
}
|
|
}
|
|
|
|
static int32_t udfdRun() {
|
|
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
|
uv_mutex_init(&global.udfsMutex);
|
|
|
|
fnInfo("start udfd event loop");
|
|
uv_run(global.loop, UV_RUN_DEFAULT);
|
|
fnInfo("udfd event loop stopped.");
|
|
|
|
uv_loop_close(global.loop);
|
|
|
|
uv_walk(global.loop, udfdCloseWalkCb, NULL);
|
|
uv_run(global.loop, UV_RUN_DEFAULT);
|
|
uv_loop_close(global.loop);
|
|
|
|
uv_mutex_destroy(&global.udfsMutex);
|
|
taosHashCleanup(global.udfsHash);
|
|
return 0;
|
|
}
|
|
|
|
void udfdConnectMnodeThreadFunc(void* args) {
|
|
int32_t retryMnodeTimes = 0;
|
|
int32_t code = 0;
|
|
while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) {
|
|
uv_sleep(100 * (1 << retryMnodeTimes));
|
|
code = udfdConnectToMnode();
|
|
if (code == 0) {
|
|
break;
|
|
}
|
|
fnError("udfd can not connect to mnode, code: %s. retry", tstrerror(code));
|
|
}
|
|
|
|
if (code != 0) {
|
|
fnError("udfd can not connect to mnode");
|
|
}
|
|
}
|
|
|
|
int main(int argc, char *argv[]) {
|
|
if (!taosCheckSystemIsSmallEnd()) {
|
|
printf("failed to start since on non-small-end machines\n");
|
|
return -1;
|
|
}
|
|
|
|
if (udfdParseArgs(argc, argv) != 0) {
|
|
printf("failed to start since parse args error\n");
|
|
return -1;
|
|
}
|
|
|
|
if (global.printVersion) {
|
|
udfdPrintVersion();
|
|
return 0;
|
|
}
|
|
|
|
if (udfdInitLog() != 0) {
|
|
printf("failed to start since init log error\n");
|
|
return -1;
|
|
}
|
|
|
|
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
|
|
fnError("failed to start since read config error");
|
|
return -2;
|
|
}
|
|
|
|
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
|
|
if (udfdOpenClientRpc() != 0) {
|
|
fnError("open rpc connection to mnode failure");
|
|
return -3;
|
|
}
|
|
|
|
if (udfdUvInit() != 0) {
|
|
fnError("uv init failure");
|
|
return -5;
|
|
}
|
|
|
|
uv_thread_t mnodeConnectThread;
|
|
uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL);
|
|
|
|
udfdRun();
|
|
|
|
removeListeningPipe();
|
|
uv_thread_join(&mnodeConnectThread);
|
|
udfdCloseClientRpc();
|
|
|
|
return 0;
|
|
}
|