Merge pull request #12252 from taosdata/feature/udf
feature(udf): enhance error process
This commit is contained in:
commit
e93376de9a
|
@ -1259,7 +1259,9 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
UdfcFuncHandle handle;
|
UdfcFuncHandle handle;
|
||||||
if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
|
int32_t udfCode = 0;
|
||||||
|
if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) {
|
||||||
|
fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
|
SClientUdfUvSession *session = (SClientUdfUvSession *)handle;
|
||||||
|
@ -1272,7 +1274,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
|
||||||
|
|
||||||
udfRes->session = (SClientUdfUvSession *)handle;
|
udfRes->session = (SClientUdfUvSession *)handle;
|
||||||
SUdfInterBuf buf = {0};
|
SUdfInterBuf buf = {0};
|
||||||
if (callUdfAggInit(handle, &buf) != 0) {
|
if ((udfCode = callUdfAggInit(handle, &buf)) != 0) {
|
||||||
|
fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
udfRes->interResNum = buf.numOfResult;
|
udfRes->interResNum = buf.numOfResult;
|
||||||
|
@ -1316,21 +1319,23 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
.numOfResult = udfRes->interResNum};
|
.numOfResult = udfRes->interResNum};
|
||||||
SUdfInterBuf newState = {0};
|
SUdfInterBuf newState = {0};
|
||||||
|
|
||||||
callUdfAggProcess(session, inputBlock, &state, &newState);
|
int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState);
|
||||||
|
if (udfCode != 0) {
|
||||||
|
fnError("udfAggProcess error. code: %d", udfCode);
|
||||||
|
newState.numOfResult = 0;
|
||||||
|
} else {
|
||||||
udfRes->interResNum = newState.numOfResult;
|
udfRes->interResNum = newState.numOfResult;
|
||||||
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
|
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
|
||||||
|
}
|
||||||
if (newState.numOfResult == 1 || state.numOfResult == 1) {
|
if (newState.numOfResult == 1 || state.numOfResult == 1) {
|
||||||
GET_RES_INFO(pCtx)->numOfRes = 1;
|
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(inputBlock);
|
blockDataDestroy(inputBlock);
|
||||||
|
|
||||||
taosArrayDestroy(tempBlock.pDataBlock);
|
taosArrayDestroy(tempBlock.pDataBlock);
|
||||||
|
|
||||||
taosMemoryFree(newState.buf);
|
taosMemoryFree(newState.buf);
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
|
@ -1344,15 +1349,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||||
.bufLen = session->bufSize,
|
.bufLen = session->bufSize,
|
||||||
.numOfResult = udfRes->interResNum};
|
.numOfResult = udfRes->interResNum};
|
||||||
callUdfAggFinalize(session, &state, &resultBuf);
|
int32_t udfCallCode= 0;
|
||||||
|
udfCallCode= callUdfAggFinalize(session, &state, &resultBuf);
|
||||||
udfRes->finalResBuf = resultBuf.buf;
|
if (udfCallCode!= 0) {
|
||||||
|
fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode);
|
||||||
|
GET_RES_INFO(pCtx)->numOfRes = 0;
|
||||||
|
} else {
|
||||||
|
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
|
||||||
udfRes->finalResNum = resultBuf.numOfResult;
|
udfRes->finalResNum = resultBuf.numOfResult;
|
||||||
|
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
||||||
teardownUdf(session);
|
|
||||||
|
|
||||||
if (resultBuf.numOfResult == 1) {
|
|
||||||
GET_RES_INFO(pCtx)->numOfRes = 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t code = teardownUdf(session);
|
||||||
|
if (code != 0) {
|
||||||
|
fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code);
|
||||||
|
}
|
||||||
|
|
||||||
return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
||||||
|
|
||||||
}
|
}
|
|
@ -347,10 +347,21 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
|
||||||
if (fmIsUserDefinedFunc(node->funcId)) {
|
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||||
UdfcFuncHandle udfHandle = NULL;
|
UdfcFuncHandle udfHandle = NULL;
|
||||||
|
|
||||||
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
|
code = setupUdf(node->functionName, &udfHandle);
|
||||||
|
if (code != 0) {
|
||||||
|
sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
||||||
teardownUdf(udfHandle);
|
if (code != 0) {
|
||||||
SCL_ERR_JRET(code);
|
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
code = teardownUdf(udfHandle);
|
||||||
|
if (code != 0) {
|
||||||
|
sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SScalarFuncExecFuncs ffpSet = {0};
|
SScalarFuncExecFuncs ffpSet = {0};
|
||||||
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
||||||
|
|
Loading…
Reference in New Issue