diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index b71d50d43c..6b15833917 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -111,6 +111,12 @@ int32_t udfStartUdfd(int32_t startDnodeId); */ int32_t udfStopUdfd(); +/** + * get udfd pid + * + */ + int32_t udfGetUdfdPid(int32_t* pUdfdPid); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 0bfab227c4..ced52058e1 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -232,7 +232,6 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { } code = -1; - taosIp2String(pReq->info.conn.clientIp, ip); if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) { mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr()); @@ -244,7 +243,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, terrstr()); goto _OVER; } - + if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0) { mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd); code = TSDB_CODE_MND_AUTH_FAILURE; @@ -270,6 +269,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { } } +_CONNECT: pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp, pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 31a7dfdbc5..5b9f44c812 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -255,6 +255,18 @@ int32_t udfStopUdfd() { return 0; } +int32_t udfGetUdfdPid(int32_t* pUdfdPid) { + SUdfdData *pData = &udfdGlobal; + if (pData->spawnErr) { + return pData->spawnErr; + } + uv_pid_t pid = uv_process_get_pid(&pData->process); + if (pUdfdPid) { + *pUdfdPid = (int32_t)pid; + } + return TSDB_CODE_SUCCESS; +} + //============================================================================================== /* Copyright (c) 2013, Ben Noordhuis * The QUEUE is copied from queue.h under libuv diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 3b827a2f99..93259924d5 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -965,40 +965,6 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { return code; } -int32_t udfdConnectToMnode() { - SConnectReq connReq = {0}; - connReq.connType = CONN_TYPE__UDFD; - tstrncpy(connReq.app, "udfd", sizeof(connReq.app)); - tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); - char pass[TSDB_PASSWORD_LEN + 1] = {0}; - taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); - tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); - connReq.pid = taosGetPId(); - connReq.startTime = taosGetTimestampMs(); - strcpy(connReq.sVer, version); - - int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); - void *pReq = rpcMallocCont(contLen); - tSerializeSConnectReq(pReq, contLen, &connReq); - - SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); - msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; - uv_sem_init(&msgInfo->resultSem, 0); - - SRpcMsg rpcMsg = {0}; - rpcMsg.msgType = TDMT_MND_CONNECT; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.info.ahandle = msgInfo; - rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - int32_t code = msgInfo->code; - uv_sem_destroy(&msgInfo->resultSem); - taosMemoryFree(msgInfo); - return code; -} - static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING || @@ -1378,23 +1344,6 @@ static int32_t udfdRun() { return 0; } -void udfdConnectMnodeThreadFunc(void *args) { - int32_t retryMnodeTimes = 0; - int32_t code = 0; - while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) { - uv_sleep(100 * (1 << retryMnodeTimes)); - code = udfdConnectToMnode(); - if (code == 0) { - break; - } - fnError("udfd can not connect to mnode, code: %s. retry", tstrerror(code)); - } - - if (code != 0) { - fnError("udfd can not connect to mnode"); - } -} - int32_t udfdInitResidentFuncs() { if (strlen(tsUdfdResFuncs) == 0) { return TSDB_CODE_SUCCESS; @@ -1497,9 +1446,6 @@ int main(int argc, char *argv[]) { udfdInitResidentFuncs(); - uv_thread_t mnodeConnectThread; - uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); - udfdRun(); removeListeningPipe(); diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index e539f11531..fbf9d50c25 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -8,6 +8,9 @@ system sh/deploy.sh -n dnode1 -i 1 system sh/cfg.sh -n dnode1 -c udf -v 1 system sh/exec.sh -n dnode1 -s start sql connect +sql alter user root pass 'taosdata2' +system sh/exec.sh -n dnode1 -s stop +system sh/exec.sh -n dnode1 -s start print ======== step1 udf system sh/compile_udf.sh