From a284d8fc4a09eb4e25a92e48c79dfc2ed44317a4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Apr 2022 13:37:18 +0800 Subject: [PATCH 1/3] feature/qnode --- include/libs/catalog/catalog.h | 2 ++ include/libs/function/functionMgt.h | 8 ++++- source/libs/catalog/src/catalog.c | 47 +++++++++++++++++++++++++ source/libs/function/src/functionMgt.c | 26 +++++++++----- source/libs/parser/src/parTranslater.c | 3 +- source/libs/qcom/src/querymsg.c | 48 ++++++++++++++++++++++++++ 6 files changed, 124 insertions(+), 10 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 5abda69aa8..e21c90d2b1 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -224,6 +224,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); +int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); + /** * Destroy catalog and relase all resources diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index cd49e88b2f..c44b925071 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -120,11 +120,17 @@ struct SqlFunctionCtx; struct SResultRowEntryInfo; struct STimeWindow; +typedef struct SFmGetFuncInfoParam { + SCatalog* pCtg; + void *pRpc; + const SEpSet* pMgmtEps; +} SFmGetFuncInfoParam; + int32_t fmFuncMgtInit(); void fmFuncMgtDestroy(); -int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType); +int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType); int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 21e0be40a4..defc1b7d5b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -643,6 +643,43 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt return TSDB_CODE_SUCCESS; } +int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo *out) { + char *msg = NULL; + int32_t msgLen = 0; + + ctgDebug("try to get udf info from mnode, funcName:%s", funcName); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)]((void *)funcName, &msg, 0, &msgLen); + if (code) { + ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName); + CTG_ERR_RET(code); + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_RETRIEVE_FUNC, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rpcRsp.code), funcName); + CTG_ERR_RET(rpcRsp.code); + } + + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process get udf rsp failed, code:%x, funcName:%s", code, funcName); + CTG_ERR_RET(code); + } + + ctgDebug("Got udf from mnode, funcName:%s", funcName); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) { if (NULL == pCtg->dbCache) { @@ -2811,6 +2848,16 @@ int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo)); } +int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgGetUdfInfoFromMnode(pCtg, pRpc, pMgmtEps, funcName, pInfo)); +} + void catalogDestroy(void) { qInfo("start to destroy catalog"); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 130b81a609..a372437b04 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -29,6 +29,7 @@ typedef struct SFuncMgtService { typedef struct SUdfInfo { SDataType outputDt; + int8_t funcType; } SUdfInfo; static SFuncMgtService gFunMgtService; @@ -52,30 +53,39 @@ static void doInitFunctionTable() { gFunMgtService.pUdfTable = NULL; } +static int8_t getUdfType(int32_t funcId) { + SUdfInfo* pUdf = taosArrayGet(gFunMgtService.pUdfTable, funcId - FUNC_UDF_ID_START_OFFSET_VAL - 1); + return pUdf->funcType; +} + static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) { + if (fmIsUserDefinedFunc(funcId)) { + return getUdfType(funcId); + } if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { return false; } return FUNC_MGT_TEST_MASK(funcMgtBuiltins[funcId].classification, classification); } -static int32_t getUdfId(const char* pFuncName) { - // todo: udf by call catalog - if (1) { +static int32_t getUdfId(SFmGetFuncInfoParam* pParam, const char* pFuncName) { + SFuncInfo* pInfo = NULL; + int32_t code = catalogGetUdfInfo(pParam->pCtg, pParam->pRpc, pParam->pMgmtEps, pFuncName, &pInfo); + if (TSDB_CODE_SUCCESS != code || NULL == pInfo) { return -1; } if (NULL == gFunMgtService.pUdfTable) { gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo)); } - SUdfInfo info = {0}; //todo + SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.byts = pInfo->outputLen, .funcType = pInfo->funcType }; taosArrayPush(gFunMgtService.pUdfTable, &info); return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL; } -static int32_t getFuncId(const char* pFuncName) { +static int32_t getFuncId(SFmGetFuncInfoParam* pParam, const char* pFuncName) { void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, pFuncName, strlen(pFuncName)); if (NULL == pVal) { - return getUdfId(pFuncName); + return getUdfId(pParam, pFuncName); } return *(int32_t*)pVal; } @@ -91,8 +101,8 @@ int32_t fmFuncMgtInit() { return initFunctionCode; } -int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) { - *pFuncId = getFuncId(pFuncName); +int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType) { + *pFuncId = getFuncId(pParam, pFuncName); if (*pFuncId < 0) { return TSDB_CODE_FAILED; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7a8db2df50..75bf861aae 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -568,7 +568,8 @@ static EDealRes haveAggFunction(SNode* pNode, void* pContext) { } static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) { - if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { + SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; + if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); } pCxt->errCode = fmGetFuncResultType(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 932107dfc1..9ad3da508b 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -157,6 +157,27 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } +int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SRetrieveFuncReq funcReq = {0}; + funcReq.numOfFuncs = 1; + funcReq.pFuncNames = taosArrayInit(1, strlen(input) + 1); + taosArrayPush(funcReq.pFuncNames, input); + + int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq); + + taosArrayDestroy(funcReq.pFuncNames); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; @@ -379,6 +400,31 @@ int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } +int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { + SRetrieveFuncRsp out = {0}; + + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + if (tDeserializeSRetrieveFuncRsp(msg, msgSize, &out) != 0) { + qError("tDeserializeSRetrieveFuncRsp failed, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + + if (1 != out.numOfFuncs) { + qError("invalid func num returned, numOfFuncs:%d", out.numOfFuncs); + return TSDB_CODE_INVALID_MSG; + } + + SFuncInfo * funcInfo = taosArrayGet(out.pFuncInfos, 0); + + memcpy(output, funcInfo, sizeof(*funcInfo)); + taosArrayDestroy(out.pFuncInfos); + + return TSDB_CODE_SUCCESS; +} + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg; @@ -386,6 +432,7 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; @@ -393,6 +440,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp; } #pragma GCC diagnostic pop From 36a8382b8e77113e19366081775b4578d2809642 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Apr 2022 15:33:07 +0800 Subject: [PATCH 2/3] feature/qnode --- include/client/taos.h | 1 + include/common/tmsg.h | 2 + include/libs/catalog/catalog.h | 2 +- include/libs/function/functionMgt.h | 3 +- source/client/inc/clientStmt.h | 1 + source/client/src/clientMain.c | 4 ++ source/client/src/clientStmt.c | 3 +- source/common/src/tmsg.c | 44 +++++++++++++++----- source/dnode/mnode/impl/src/mndFunc.c | 45 ++++++++++++--------- source/dnode/mnode/impl/src/mndInfoSchema.c | 2 +- source/libs/catalog/src/catalog.c | 28 +++++++++++-- source/libs/function/src/functionMgt.c | 4 +- source/libs/parser/src/parTranslater.c | 3 +- source/libs/qcom/src/querymsg.c | 1 + 14 files changed, 104 insertions(+), 39 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 526edae8af..55deee4fad 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -146,6 +146,7 @@ DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a06b102458..fac75e7fc6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -680,6 +680,7 @@ int32_t tDeserializeSDropFuncReq(void* buf, int32_t bufLen, SDropFuncReq* pReq); typedef struct { int32_t numOfFuncs; + bool ignoreCodeComment; SArray* pFuncNames; } SRetrieveFuncReq; @@ -708,6 +709,7 @@ typedef struct { int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); +void tFreeSFuncInfo(SFuncInfo *pInfo); void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp); typedef struct { diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index e21c90d2b1..f61073c9a5 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -224,7 +224,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); -int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); +int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo); /** diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index c44b925071..2fff819f54 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -119,9 +119,10 @@ typedef enum EFunctionType { struct SqlFunctionCtx; struct SResultRowEntryInfo; struct STimeWindow; +struct SCatalog; typedef struct SFmGetFuncInfoParam { - SCatalog* pCtg; + struct SCatalog* pCtg; void *pRpc; const SEpSet* pMgmtEps; } SFmGetFuncInfoParam; diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 219257ba74..04e9c3be9a 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -60,6 +60,7 @@ typedef struct SStmtBindInfo { } SStmtBindInfo; typedef struct SStmtExecInfo { + int32_t affectedRows; SRequestObj* pRequest; SHashObj* pVgHash; SHashObj* pBlockHash; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e730f0a34a..99cbd208ee 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -622,6 +622,10 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { return stmtSetTbName(stmt, name); } +int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { + return taos_stmt_set_tbname(stmt, name); +} + int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind) { if (stmt == NULL || bind == NULL) { tscError("NULL parameter for %s", __FUNCTION__); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 8a5134e02a..0972ff3477 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -486,7 +486,8 @@ int stmtExec(TAOS_STMT *stmt) { STMT_ERR_JRET(pStmt->exec.pRequest->code); - pStmt->affectedRows += taos_affected_rows(pStmt->exec.pRequest); + pStmt->exec.affectedRows = taos_affected_rows(pStmt->exec.pRequest); + pStmt->affectedRows += pStmt->exec.affectedRows; _return: diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8b48d7914a..aa6beb49ae 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1624,6 +1624,7 @@ int32_t tSerializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq * if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfFuncs) < 0) return -1; + if (tEncodeI8(&encoder, pReq->ignoreCodeComment) < 0) return -1; if (pReq->numOfFuncs != (int32_t)taosArrayGetSize(pReq->pFuncNames)) return -1; for (int32_t i = 0; i < pReq->numOfFuncs; ++i) { @@ -1644,6 +1645,7 @@ int32_t tDeserializeSRetrieveFuncReq(void *buf, int32_t bufLen, SRetrieveFuncReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfFuncs) < 0) return -1; + if (tDecodeI8(&decoder, (int8_t *)&pReq->ignoreCodeComment) < 0) return -1; pReq->pFuncNames = taosArrayInit(pReq->numOfFuncs, TSDB_FUNC_NAME_LEN); if (pReq->pFuncNames == NULL) return -1; @@ -1681,8 +1683,12 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * if (tEncodeI64(&encoder, pInfo->signature) < 0) return -1; if (tEncodeI32(&encoder, pInfo->codeSize) < 0) return -1; if (tEncodeI32(&encoder, pInfo->commentSize) < 0) return -1; - if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1; - if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1; + if (pInfo->codeSize) { + if (tEncodeCStr(&encoder, pInfo->pCode) < 0) return -1; + } + if (pInfo->commentSize) { + if (tEncodeCStr(&encoder, pInfo->pComment) < 0) return -1; + } } tEndEncode(&encoder); @@ -1713,15 +1719,23 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp if (tDecodeI64(&decoder, &fInfo.signature) < 0) return -1; if (tDecodeI32(&decoder, &fInfo.codeSize) < 0) return -1; if (tDecodeI32(&decoder, &fInfo.commentSize) < 0) return -1; - fInfo.pCode = taosMemoryCalloc(1, fInfo.codeSize); - fInfo.pComment = taosMemoryCalloc(1, fInfo.commentSize); - if (fInfo.pCode == NULL || fInfo.pComment == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + if (fInfo.codeSize) { + fInfo.pCode = taosMemoryCalloc(1, fInfo.codeSize); + if (fInfo.pCode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tDecodeCStrTo(&decoder, fInfo.pCode) < 0) return -1; + } + if (fInfo.commentSize) { + fInfo.pComment = taosMemoryCalloc(1, fInfo.commentSize); + if (fInfo.pComment == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tDecodeCStrTo(&decoder, fInfo.pComment) < 0) return -1; } - if (tDecodeCStrTo(&decoder, fInfo.pCode) < 0) return -1; - if (tDecodeCStrTo(&decoder, fInfo.pComment) < 0) return -1; taosArrayPush(pRsp->pFuncInfos, &fInfo); } tEndDecode(&decoder); @@ -1730,12 +1744,20 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp return 0; } +void tFreeSFuncInfo(SFuncInfo *pInfo) { + if (NULL == pInfo) { + return; + } + + taosMemoryFree(pInfo->pCode); + taosMemoryFree(pInfo->pComment); +} + void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { int32_t size = taosArrayGetSize(pRsp->pFuncInfos); for (int32_t i = 0; i < size; ++i) { SFuncInfo *pInfo = taosArrayGet(pRsp->pFuncInfos, i); - taosMemoryFree(pInfo->pCode); - taosMemoryFree(pInfo->pComment); + tFreeSFuncInfo(pInfo); } taosArrayDestroy(pRsp->pFuncInfos); } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index c25561814b..156d894a44 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -427,7 +427,6 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) { SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName); if (pFunc == NULL) { - terrno = TSDB_CODE_MND_INVALID_FUNC; goto RETRIEVE_FUNC_OVER; } @@ -439,21 +438,26 @@ static int32_t mndProcessRetrieveFuncReq(SNodeMsg *pReq) { funcInfo.outputLen = pFunc->outputLen; funcInfo.bufSize = pFunc->bufSize; funcInfo.signature = pFunc->signature; - funcInfo.commentSize = pFunc->commentSize; - funcInfo.codeSize = pFunc->codeSize; - funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize); - if (funcInfo.pCode == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto RETRIEVE_FUNC_OVER; - } - memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize); - if (funcInfo.commentSize > 0) { - funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize); - if (funcInfo.pComment == NULL) { + if (retrieveReq.ignoreCodeComment) { + funcInfo.commentSize = 0; + funcInfo.codeSize = 0; + } else { + funcInfo.commentSize = pFunc->commentSize; + funcInfo.codeSize = pFunc->codeSize; + funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize); + if (funcInfo.pCode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto RETRIEVE_FUNC_OVER; } - memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize); + memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize); + if (funcInfo.commentSize > 0) { + funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize); + if (funcInfo.pComment == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto RETRIEVE_FUNC_OVER; + } + memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize); + } } taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); mndReleaseFunc(pMnode, pFunc); @@ -518,11 +522,16 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)b1, false); - char *b2 = taosMemoryCalloc(1, pShow->bytes[cols]); - STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->bytes[cols]); + if (pFunc->pComment) { + char *b2 = taosMemoryCalloc(1, pShow->bytes[cols]); + STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->bytes[cols]); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)b2, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)b2, false); + } else { + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, NULL, true); + } int32_t isAgg = (pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE) ? 1 : 0; @@ -556,4 +565,4 @@ static int32_t mndRetrieveFuncs(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 7c115c2e24..32c667faf9 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -98,7 +98,7 @@ static const SInfosTableSchema userFuncSchema[] = { {.name = "name", .bytes = TSDB_FUNC_NAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "comment", .bytes = PATH_MAX - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "aggregate", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "comment", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "output_type", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "code_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "bufsize", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index defc1b7d5b..410527c9e6 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -643,7 +643,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt return TSDB_CODE_SUCCESS; } -int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo *out) { +int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *funcName, SFuncInfo **out) { char *msg = NULL; int32_t msgLen = 0; @@ -665,11 +665,17 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEp rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { + if (TSDB_CODE_MND_FUNC_NOT_EXIST == rpcRsp.code) { + ctgDebug("funcName %s not exist in mnode", funcName); + taosMemoryFreeClear(*out); + CTG_RET(TSDB_CODE_SUCCESS); + } + ctgError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rpcRsp.code), funcName); CTG_ERR_RET(rpcRsp.code); } - code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)](out, rpcRsp.pCont, rpcRsp.contLen); + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)](*out, rpcRsp.pCont, rpcRsp.contLen); if (code) { ctgError("Process get udf rsp failed, code:%x, funcName:%s", code, funcName); CTG_ERR_RET(code); @@ -2848,14 +2854,28 @@ int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo)); } -int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) { +int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo** pInfo) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgGetUdfInfoFromMnode(pCtg, pRpc, pMgmtEps, funcName, pInfo)); + int32_t code = 0; + *pInfo = taosMemoryMalloc(sizeof(SFuncInfo)); + if (NULL == *pInfo) { + CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY); + } + + CTG_ERR_JRET(ctgGetUdfInfoFromMnode(pCtg, pRpc, pMgmtEps, funcName, pInfo)); + +_return: + + if (code) { + taosMemoryFreeClear(*pInfo); + } + + CTG_API_LEAVE(code); } diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index a372437b04..d44e3e251b 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -77,8 +77,10 @@ static int32_t getUdfId(SFmGetFuncInfoParam* pParam, const char* pFuncName) { if (NULL == gFunMgtService.pUdfTable) { gFunMgtService.pUdfTable = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SUdfInfo)); } - SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.byts = pInfo->outputLen, .funcType = pInfo->funcType }; + SUdfInfo info = { .outputDt.type = pInfo->outputType, .outputDt.bytes = pInfo->outputLen, .funcType = pInfo->funcType }; taosArrayPush(gFunMgtService.pUdfTable, &info); + tFreeSFuncInfo(pInfo); + taosMemoryFree(pInfo); return taosArrayGetSize(gFunMgtService.pUdfTable) + FUNC_UDF_ID_START_OFFSET_VAL; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 75bf861aae..322dc004e2 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1838,7 +1838,8 @@ static int32_t checkTableRollupOption(STranslateContext* pCxt, SNodeList* pFuncs return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_ROLLUP_OPTION); } SFunctionNode* pFunc = nodesListGetNode(pFuncs, 0); - if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { + SFmGetFuncInfoParam param = { .pCtg = pCxt->pParseCxt->pCatalog, .pRpc = pCxt->pParseCxt->pTransporter, .pMgmtEps = &pCxt->pParseCxt->mgmtEpSet}; + if (TSDB_CODE_SUCCESS != fmGetFuncInfo(¶m, pFunc->functionName, &pFunc->funcId, &pFunc->funcType)) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_FUNTION, pFunc->functionName); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 9ad3da508b..9e37784cab 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -164,6 +164,7 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3 SRetrieveFuncReq funcReq = {0}; funcReq.numOfFuncs = 1; + funcReq.ignoreCodeComment = true; funcReq.pFuncNames = taosArrayInit(1, strlen(input) + 1); taosArrayPush(funcReq.pFuncNames, input); From cefa203fc5f807f603957347db99064a57cebc12 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Apr 2022 17:21:09 +0800 Subject: [PATCH 3/3] feature/qnode --- source/dnode/mnode/impl/test/func/func.cpp | 4 ++-- source/libs/planner/src/planOptimizer.c | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/test/func/func.cpp b/source/dnode/mnode/impl/test/func/func.cpp index 1569976105..4db9411e87 100644 --- a/source/dnode/mnode/impl/test/func/func.cpp +++ b/source/dnode/mnode/impl/test/func/func.cpp @@ -240,7 +240,7 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST); } { @@ -371,7 +371,7 @@ TEST_F(MndTestFunc, 03_Retrieve_Func) { SRpcMsg* pRsp = test.SendReq(TDMT_MND_RETRIEVE_FUNC, pReq, contLen); ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_FUNC); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_FUNC_NOT_EXIST); } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 34bee3fd0b..832135e90e 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -228,6 +228,8 @@ static int32_t cpdMergeConds(SNode** pDst, SNodeList** pSrc) { if (NULL == pLogicCond) { return TSDB_CODE_OUT_OF_MEMORY; } + pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL; + pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pLogicCond->condType = LOGIC_COND_TYPE_AND; pLogicCond->pParameterList = *pSrc; *pDst = (SNode*)pLogicCond;