feat:udf refactoring

This commit is contained in:
shenglian zhou 2022-05-22 16:29:41 +08:00
parent 712217ab02
commit 7976b6abfd
1 changed files with 157 additions and 120 deletions

View File

@ -103,6 +103,42 @@ typedef struct SUdfdRpcSendRecvInfo {
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
static int32_t udfdConnectToMnode();
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
static bool udfdRpcRfp(int32_t code);
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t udfdOpenClientRpc();
static int32_t udfdCloseClientRpc();
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessRequest(uv_work_t *req);
static void udfdOnWrite(uv_write_t *req, int status);
static void udfdSendResponse(uv_work_t *work, int status);
static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe);
static void udfdHandleRequest(SUdfdUvConn *conn);
static void udfdPipeCloseCb(uv_handle_t *pipe);
static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
static void udfdOnNewConnection(uv_stream_t *server, int status);
static void udfdIntrSignalHandler(uv_signal_t *handle, int signum);
static int32_t removeListeningPipe();
static void udfdPrintVersion();
static int32_t udfdParseArgs(int32_t argc, char *argv[]);
static int32_t udfdInitLog();
static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
static int32_t udfdUvInit();
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static int32_t udfdRun();
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
ASSERT(pMsg->info.ahandle != NULL);
@ -144,11 +180,14 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name);
TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
}
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
tFreeSFuncInfo(pFuncInfo);
@ -275,6 +314,104 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
}
return 0;
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
pEpSet->version = 0;
// init mnode ip set
SEpSet *mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
mgmtEpSet->numOfEps++;
}
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
return 0;
}
int32_t udfdOpenClientRpc() {
SRpcInit rpcInit = {0};
rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0;
}
int32_t udfdCloseClientRpc() {
rpcClose(global.clientRpc);
return 0;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
udfdProcessSetupRequest(uvUdf, &request);
break;
}
case UDF_TASK_CALL: {
udfdProcessCallRequest(uvUdf, &request);
break;
}
case UDF_TASK_TEARDOWN: {
udfdProcessTeardownRequest(uvUdf, &request);
break;
}
default: {
break;
}
}
}
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
// TODO: tracable id from client. connect, setup, call, teardown
@ -471,31 +608,6 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
return;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request);
switch (request.type) {
case UDF_TASK_SETUP: {
udfdProcessSetupRequest(uvUdf, &request);
break;
}
case UDF_TASK_CALL: {
udfdProcessCallRequest(uvUdf, &request);
break;
}
case UDF_TASK_TEARDOWN: {
udfdProcessTeardownRequest(uvUdf, &request);
break;
}
default: {
break;
}
}
}
void udfdOnWrite(uv_write_t *req, int status) {
SUvUdfWork *work = (SUvUdfWork *)req->data;
if (status < 0) {
@ -529,7 +641,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->base = ctx->inputBuf;
buf->len = ctx->inputCap;
} else {
// TODO: log error
fnError("udfd can not allocate enough memory")
buf->base = NULL;
buf->len = 0;
}
@ -541,7 +653,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = ctx->inputCap - ctx->inputLen;
} else {
// TODO: log error
fnError("udfd can not allocate enough memory")
buf->base = NULL;
buf->len = 0;
}
@ -580,8 +692,6 @@ void udfdPipeCloseCb(uv_handle_t *pipe) {
taosMemoryFree(conn);
}
void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); }
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnDebug("udf read %zd bytes from client", nread);
if (nread == 0) return;
@ -638,91 +748,6 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop(global.loop);
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
pEpSet->version = 0;
// init mnode ip set
SEpSet *mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return terrno;
}
mgmtEpSet->numOfEps++;
}
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++;
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
return 0;
}
int32_t udfdOpenClientRpc() {
SRpcInit rpcInit = {0};
rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0;
}
int32_t udfdCloseClientRpc() {
rpcClose(global.clientRpc);
return 0;
}
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildInfo: %s\n", buildinfo);
}
static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
@ -745,6 +770,17 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
return 0;
}
static void udfdPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("buildInfo: %s\n", buildinfo);
}
static int32_t udfdInitLog() {
char logName[12] = {0};
snprintf(logName, sizeof(logName), "%slog", "udfd");
@ -868,8 +904,8 @@ int main(int argc, char *argv[]) {
int32_t retryMnodeTimes = 0;
int32_t code = 0;
while (retryMnodeTimes++ < TSDB_MAX_REPLICA) {
uv_sleep(500 * (1 << retryMnodeTimes));
while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) {
uv_sleep(100 * (1 << retryMnodeTimes));
code = udfdConnectToMnode();
if (code == 0) {
break;
@ -890,6 +926,7 @@ int main(int argc, char *argv[]) {
udfdRun();
removeListeningPipe();
udfdCloseClientRpc();
return 0;
}