diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index e3d7621fb0..907fddaec2 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -17,6 +17,7 @@ #include "qndInt.h" #include "query.h" #include "qworker.h" +//#include "tudf.h" SQnode *qndOpen(const SQnodeOpt *pOption) { SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode)); @@ -25,6 +26,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { return NULL; } + //udfcOpen(); + if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); return NULL; @@ -37,6 +40,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { void qndClose(SQnode *pQnode) { qWorkerDestroy((void **)&pQnode->pQuery); + //udfcClose(); + taosMemoryFree(pQnode); } diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 6b2e7c17d3..820a4894b5 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -329,28 +329,40 @@ _return: } int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *output) { - SScalarFuncExecFuncs ffpSet = {0}; - int32_t code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); - if (code) { - sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); - SCL_ERR_RET(code); - } - SScalarParam *params = NULL; int32_t rowNum = 0; int32_t paramNum = 0; + int32_t code = 0; SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum)); - output->columnData = createColumnInfoData(&node->node.resType, rowNum); - if (output->columnData == NULL) { - sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes)); - SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - code = (*ffpSet.process)(params, paramNum, output); - if (code) { - sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + if (fmIsUserDefinedFunc(node->funcId)) { +#if 0 + UdfcFuncHandle udfHandle = NULL; + + SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle)); + code = callUdfScalarFunc(udfHandle, params, paramNum, output); + teardownUdf(udfHandle); SCL_ERR_JRET(code); +#endif + } else { + SScalarFuncExecFuncs ffpSet = {0}; + code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); + if (code) { + sclError("fmGetFuncExecFuncs failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + SCL_ERR_JRET(code); + } + + output->columnData = createColumnInfoData(&node->node.resType, rowNum); + if (output->columnData == NULL) { + sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes)); + SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + code = (*ffpSet.process)(params, paramNum, output); + if (code) { + sclError("scalar function exec failed, funcId:%d, code:%s", node->funcId, tstrerror(code)); + SCL_ERR_JRET(code); + } } _return: