diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 544af9b6ee..71be1a5014 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -70,7 +70,7 @@ typedef uint16_t tmsg_t; #define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_STATE 7 -enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX }; +enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX }; enum { HEARTBEAT_KEY_USER_AUTHINFO = 1, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 807234a1b1..c25d74911c 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunction(SqlFunctionCtx* pCtx); -int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +//int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + +bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t tailFunction(SqlFunctionCtx* pCtx); +int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index fc93008312..e41e3c7c39 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + if (2 != numOfParams && 3 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The input parameter of TAIL function can only be column"); + } + + for (int32_t i = 1; i < numOfParams; ++i) { + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; + if (!IS_INTEGER_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + } + + SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + uint8_t colType = pCol->resType.type; + if (IS_VAR_DATA_TYPE(colType)) { + pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType}; + } else { + pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType}; + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = sampleFunction, .finalizeFunc = NULL }, + { + .name = "tail", + .type = FUNCTION_TYPE_TAIL, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateTail, + .getEnvFunc = getTailFuncEnv, + .initFunc = tailFunctionSetup, + .processFunc = tailFunction, + .finalizeFunc = tailFinalize + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index af86eb4e90..479a11ca4a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -18,12 +18,15 @@ #include "function.h" #include "querynodes.h" #include "taggfunction.h" +#include "tcompare.h" #include "tdatablock.h" #include "tpercentile.h" #define HISTOGRAM_MAX_BINS_NUM 1000 #define MAVG_MAX_POINTS_NUM 1000 #define SAMPLE_MAX_POINTS_NUM 1000 +#define TAIL_MAX_POINTS_NUM 100 +#define TAIL_MAX_OFFSET 100 typedef struct SSumRes { union { @@ -161,6 +164,21 @@ typedef struct SSampleInfo { int64_t *timestamp; } SSampleInfo; +typedef struct STailItem { + int64_t timestamp; + bool isNull; + char data[]; +} STailItem; + +typedef struct STailInfo { + int32_t numOfPoints; + int32_t numAdded; + int32_t offset; + uint8_t colType; + int16_t colBytes; + STailItem **pItems; +} STailInfo; + #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ @@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { int32_t startOffset = pCtx->offset; for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { - if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + if (colDataIsNull_s(pInputCol, i)) { //colDataAppendNULL(pOutput, i); continue; } @@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { // // return pResInfo->numOfRes; //} + + +bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); + SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); + int32_t numOfPoints = pVal->datum.i; + pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes); + return true; +} + +bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo); + pInfo->numAdded = 0; + pInfo->numOfPoints = pCtx->param[1].param.i; + if (pCtx->numOfParams == 4) { + pInfo->offset = pCtx->param[2].param.i; + } else { + pInfo->offset = 0; + } + pInfo->colType = pCtx->resDataInfo.type; + pInfo->colBytes = pCtx->resDataInfo.bytes; + if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) || + (pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) { + return false; + } + + pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo)); + char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES; + + size_t unitSize = sizeof(STailItem) + pInfo->colBytes; + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize); + pInfo->pItems[i]->isNull = false; + } + + return true; +} + +static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts, bool isNull) { + pItem->timestamp = ts; + if (isNull) { + pItem->isNull = true; + } else { + memcpy(pItem->data, data, colBytes); + } +} + +static int32_t tailCompFn(const void *p1, const void *p2, const void *param) { + STailItem *d1 = *(STailItem **)p1; + STailItem *d2 = *(STailItem **)p2; + return compareInt64Val(&d1->timestamp, &d2->timestamp); +} + +static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts, bool isNull) { + STailItem **pList = pInfo->pItems; + if (pInfo->numAdded < pInfo->numOfPoints) { + tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts, isNull); + taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0); + pInfo->numAdded++; + } else if (pList[0]->timestamp < ts) { + tailAssignResult(pList[0], data, pInfo->colBytes, ts, isNull); + taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0); + } +} + +int32_t tailFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t startOffset = pCtx->offset; + if (pInfo->offset >= pInput->numOfRows) { + return 0; + } else { + pInfo->numOfPoints = MIN(pInfo->numOfPoints, pInput->numOfRows - pInfo->offset); + } + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex - pInfo->offset; i += 1) { + + char* data = colDataGetData(pInputCol, i); + doTailAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i)); + } + + taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn); + + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + int32_t pos = startOffset + i; + STailItem *pItem = pInfo->pItems[i]; + if (pItem->isNull) { + colDataAppendNULL(pOutput, pos); + } else { + colDataAppend(pOutput, pos, pItem->data, false); + } + } + + return pInfo->numOfPoints; +} + +int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo); + pEntryInfo->complete = true; + + int32_t type = pCtx->input.pData[0]->info.type; + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + // todo assign the tag value and the corresponding row data + int32_t currentRow = pBlock->info.rows; + for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { + STailItem *pItem = pInfo->pItems[i]; + colDataAppend(pCol, currentRow, pItem->data, false); + + //setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow); + currentRow += 1; + } + + return pEntryInfo->numOfRes; +} diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 4841e05267..388ec28b76 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub { char udfName[TSDB_FUNC_NAME_LEN]; UdfcFuncHandle handle; int32_t refCount; + int64_t lastRefTime; } SUdfcFuncStub; typedef struct SUdfcProxy { @@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); *pHandle = foundStub->handle; ++foundStub->refCount; + foundStub->lastRefTime = taosGetTimestampUs(); return 0; } *pHandle = NULL; @@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { strcpy(stub.udfName, udfName); stub.handle = *pHandle; ++stub.refCount; + stub.lastRefTime = taosGetTimestampUs(); taosArrayPush(gUdfdProxy.udfStubs, &stub); taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); } else { @@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() { fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); doTeardownUdf(stub->handle); } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle); + fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", + stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); taosArrayPush(udfStubs, stub); } ++i; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 34681dc6cd..706bf28be0 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { uv_stop(global.loop); } -void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } +typedef enum EUdfdRpcReqRspType { + UDFD_RPC_MNODE_CONNECT = 0, + UDFD_RPC_RETRIVE_FUNC, +} EUdfdRpcReqRspType; + +typedef struct SUdfdRpcSendRecvInfo { + EUdfdRpcReqRspType rpcType; + int32_t code; + void* param; + uv_sem_t resultSem; +} SUdfdRpcSendRecvInfo; + + +void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle; + ASSERT(pMsg->ahandle != NULL); + + if (pEpSet) { + if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { + updateEpSet_s(&global.mgmtEp, pEpSet); + } + } + + if (pMsg->code != TSDB_CODE_SUCCESS) { + fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); + msgInfo->code = pMsg->code; + goto _return; + } + + if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + if (connectRsp.epSet.numOfEps == 0) { + msgInfo->code = TSDB_CODE_MND_APP_ERROR; + goto _return; + } + + if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { + updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); + } + msgInfo->code = 0; + } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { + SRetrieveFuncRsp retrieveRsp = {0}; + tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); + + SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); + SUdf* udf = msgInfo->param; + udf->funcType = pFuncInfo->funcType; + udf->scriptType = pFuncInfo->scriptType; + udf->outputType = pFuncInfo->funcType; + udf->outputLen = pFuncInfo->outputLen; + udf->bufSize = pFuncInfo->bufSize; + + char path[PATH_MAX] = {0}; + snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name); + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + // TODO check for failure of flush to disk + taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); + taosCloseFile(&file); + strncpy(udf->path, path, strlen(path)); + taosArrayDestroy(retrieveRsp.pFuncInfos); + msgInfo->code = 0; + } + +_return: + rpcFreeCont(pMsg->pCont); + uv_sem_post(&msgInfo->resultSem); + return; +} + +int32_t udfdConnectToMNode() { + SConnectReq connReq = {0}; + connReq.connType = CONN_TYPE__UDFD; + tstrncpy(connReq.app, "udfd",sizeof(connReq.app)); + tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); + tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); + connReq.pid = htonl(taosGetPId()); + connReq.startTime = htobe64(taosGetTimestampMs()); + + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connReq); + + SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_MND_CONNECT; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.ahandle = msgInfo; + rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + int32_t code = msgInfo->code; + uv_sem_destroy(&msgInfo->resultSem); + taosMemoryFree(msgInfo); + return code; +} + +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { + SRetrieveFuncReq retrieveReq = {0}; + retrieveReq.numOfFuncs = 1; + retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); + taosArrayPush(retrieveReq.pFuncNames, udfName); + + int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); + void *pReq = rpcMallocCont(contLen); + tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); + taosArrayDestroy(retrieveReq.pFuncNames); + + SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; + msgInfo->param = udf; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; + rpcMsg.ahandle = msgInfo; + rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + uv_sem_destroy(&msgInfo->resultSem); + int32_t code = msgInfo->code; + taosMemoryFree(msgInfo); + return code; +} + +static bool udfdRpcRfp(int32_t code) { + if (code == TSDB_CODE_RPC_REDIRECT) { + return true; + } else { + return false; + } +} int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { pEpSet->version = 0; @@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe return 0; } -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { - SRetrieveFuncReq retrieveReq = {0}; - retrieveReq.numOfFuncs = 1; - retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, udfName); - - int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); - void *pReq = rpcMallocCont(contLen); - tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); - taosArrayDestroy(retrieveReq.pFuncNames); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; - - SRpcMsg rpcRsp = {0}; - rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp); - SRetrieveFuncRsp retrieveRsp = {0}; - tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp); - - SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - - udf->funcType = pFuncInfo->funcType; - udf->scriptType = pFuncInfo->scriptType; - udf->outputType = pFuncInfo->funcType; - udf->outputLen = pFuncInfo->outputLen; - udf->bufSize = pFuncInfo->bufSize; - - char path[PATH_MAX] = {0}; - snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName); - TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); - // TODO check for failure of flush to disk - taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); - taosCloseFile(&file); - strncpy(udf->path, path, strlen(path)); - taosArrayDestroy(retrieveRsp.pFuncInfos); - - rpcFreeCont(rpcRsp.pCont); - return 0; -} int32_t udfdOpenClientRpc() { - char *pass = "taosdata"; - char *user = "root"; - char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0}; - taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt); SRpcInit rpcInit = {0}; - rpcInit.label = (char *)"UDFD"; + rpcInit.label = "UDFD"; rpcInit.numOfThreads = 1; - rpcInit.cfp = udfdProcessRpcRsp; + rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = 30 * 1000; - rpcInit.parent = &global; - - rpcInit.user = (char *)user; - rpcInit.ckey = (char *)"key"; - rpcInit.secret = (char *)secretEncrypt; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.user = TSDB_DEFAULT_USER; + rpcInit.ckey = "key"; rpcInit.spi = 1; + rpcInit.parent = &global; + rpcInit.rfp = udfdRpcRfp; + + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); + rpcInit.secret = pass; global.clientRpc = rpcOpen(&rpcInit); - + if (global.clientRpc == NULL) { + fnError("failed to init dnode rpc client"); + return -1; + } return 0; } @@ -700,12 +800,6 @@ static int32_t udfdRun() { global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); - // TOOD: client rpc to fetch udf function info from mnode - if (udfdOpenClientRpc() != 0) { - fnError("open rpc connection to mnode failure"); - return -1; - } - if (udfdUvInit() != 0) { fnError("uv init failure"); return -2; @@ -717,7 +811,6 @@ static int32_t udfdRun() { int codeClose = uv_loop_close(global.loop); fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); removeListeningPipe(); - udfdCloseClientRpc(); uv_mutex_destroy(&global.udfsMutex); taosHashCleanup(global.udfsHash); return 0; @@ -746,9 +839,22 @@ int main(int argc, char *argv[]) { if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { fnError("failed to start since read config error"); - return -1; + return -2; } initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); - return udfdRun(); + if (udfdOpenClientRpc() != 0) { + fnError("open rpc connection to mnode failure"); + return -3; + } + + if (udfdConnectToMNode() != 0) { + fnError("failed to start since can not connect to mnode"); + return -4; + } + + udfdRun(); + + udfdCloseClientRpc(); + } diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index b02ca79ed4..24ddcc1b75 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1 print ========= start dnode1 as LEADER system sh/exec.sh -n dnode1 -s start -sleep 2000 +sleep 1000 sql connect print ======== step1 udf