feat: enhance udf scalar function calls
This commit is contained in:
parent
a6900a7e96
commit
e8690dabef
|
@ -27,11 +27,13 @@ typedef struct SScalarCtx {
|
||||||
SArray *pBlockList; /* element is SSDataBlock* */
|
SArray *pBlockList; /* element is SSDataBlock* */
|
||||||
SHashObj *pRes; /* element is SScalarParam */
|
SHashObj *pRes; /* element is SScalarParam */
|
||||||
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
|
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
|
||||||
|
SHashObj *udf2Handle;
|
||||||
} SScalarCtx;
|
} SScalarCtx;
|
||||||
|
|
||||||
|
|
||||||
#define SCL_DATA_TYPE_DUMMY_HASH 9000
|
#define SCL_DATA_TYPE_DUMMY_HASH 9000
|
||||||
#define SCL_DEFAULT_OP_NUM 10
|
#define SCL_DEFAULT_OP_NUM 10
|
||||||
|
#define SCL_DEFAULT_UDF_NUM 8
|
||||||
|
|
||||||
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
|
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
|
||||||
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
|
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
|
||||||
|
|
|
@ -153,6 +153,18 @@ void sclFreeRes(SHashObj *res) {
|
||||||
taosHashCleanup(res);
|
taosHashCleanup(res);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void sclFreeUdfHandles(SHashObj *udf2handle) {
|
||||||
|
void *pIter = taosHashIterate(udf2handle, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
UdfcFuncHandle *handle = (UdfcFuncHandle *)pIter;
|
||||||
|
if (handle) {
|
||||||
|
teardownUdf(*handle);
|
||||||
|
}
|
||||||
|
pIter = taosHashIterate(udf2handle, pIter);
|
||||||
|
}
|
||||||
|
taosHashCleanup(udf2handle);
|
||||||
|
}
|
||||||
|
|
||||||
void sclFreeParam(SScalarParam *param) {
|
void sclFreeParam(SScalarParam *param) {
|
||||||
if (param->columnData != NULL) {
|
if (param->columnData != NULL) {
|
||||||
colDataDestroy(param->columnData);
|
colDataDestroy(param->columnData);
|
||||||
|
@ -362,22 +374,28 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
|
|
||||||
if (fmIsUserDefinedFunc(node->funcId)) {
|
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||||
UdfcFuncHandle udfHandle = NULL;
|
UdfcFuncHandle udfHandle = NULL;
|
||||||
|
char* udfName = node->functionName;
|
||||||
code = setupUdf(node->functionName, &udfHandle);
|
if (ctx->udf2Handle) {
|
||||||
|
UdfcFuncHandle *pHandle = taosHashGet(ctx->udf2Handle, udfName, strlen(udfName));
|
||||||
|
if (pHandle) {
|
||||||
|
udfHandle = *pHandle;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (udfHandle == NULL) {
|
||||||
|
code = setupUdf(udfName, &udfHandle);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
|
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", udfName, code);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
if (ctx->udf2Handle) {
|
||||||
|
taosHashPut(ctx->udf2Handle, udfName, strlen(udfName), &udfHandle, sizeof(UdfcFuncHandle));
|
||||||
|
}
|
||||||
|
}
|
||||||
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
|
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
code = teardownUdf(udfHandle);
|
|
||||||
if (code != 0) {
|
|
||||||
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
SScalarFuncExecFuncs ffpSet = {0};
|
SScalarFuncExecFuncs ffpSet = {0};
|
||||||
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
||||||
|
@ -891,15 +909,20 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
|
||||||
SScalarCtx ctx = {0};
|
SScalarCtx ctx = {0};
|
||||||
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
if (NULL == ctx.pRes) {
|
if (NULL == ctx.pRes) {
|
||||||
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
|
sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM);
|
||||||
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
if (NULL == ctx.udf2Handle) {
|
||||||
|
sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
|
nodesRewriteExprPostOrder(&pNode, sclConstantsRewriter, (void *)&ctx);
|
||||||
SCL_ERR_JRET(ctx.code);
|
SCL_ERR_JRET(ctx.code);
|
||||||
*pRes = pNode;
|
*pRes = pNode;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
sclFreeUdfHandles(ctx.udf2Handle);
|
||||||
sclFreeRes(ctx.pRes);
|
sclFreeRes(ctx.pRes);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -915,10 +938,14 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
||||||
// TODO: OPT performance
|
// TODO: OPT performance
|
||||||
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
if (NULL == ctx.pRes) {
|
if (NULL == ctx.pRes) {
|
||||||
sclError("taosHashInit failed, num:%d", SCL_DEFAULT_OP_NUM);
|
sclError("taosHashInit result map failed, num:%d", SCL_DEFAULT_OP_NUM);
|
||||||
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
ctx.udf2Handle = taosHashInit(SCL_DEFAULT_UDF_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
if (NULL == ctx.udf2Handle) {
|
||||||
|
sclError("taosHashInit udf to handle map failed, num:%d", SCL_DEFAULT_OP_NUM);
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
|
nodesWalkExprPostOrder(pNode, sclCalcWalker, (void *)&ctx);
|
||||||
SCL_ERR_JRET(ctx.code);
|
SCL_ERR_JRET(ctx.code);
|
||||||
|
|
||||||
|
@ -936,6 +963,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
//nodesDestroyNode(pNode);
|
//nodesDestroyNode(pNode);
|
||||||
|
sclFreeUdfHandles(ctx.udf2Handle);
|
||||||
sclFreeRes(ctx.pRes);
|
sclFreeRes(ctx.pRes);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue