Merge pull request #11996 from taosdata/feature/udf

feat(udf): agg function refinement
This commit is contained in:
shenglian-zhou 2022-04-28 15:05:49 +08:00 committed by GitHub
commit 34675b2cdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 20 additions and 9 deletions

View File

@ -207,7 +207,7 @@ typedef struct SqlFunctionCtx {
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
int32_t curBufPage; int32_t curBufPage;
char* udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
} SqlFunctionCtx; } SqlFunctionCtx;
enum { enum {

View File

@ -162,6 +162,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet);
int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -140,7 +140,7 @@ typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);

View File

@ -1897,7 +1897,14 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) { if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) {
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
if (!isUdaf) {
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
} else {
char *udfName = pExpr->pExpr->_function.pFunctNode->functionName;
strncpy(pCtx->udfName, udfName, strlen(udfName));
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
}
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
} else { } else {
fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp); fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);

View File

@ -124,7 +124,10 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) { int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
if (!fmIsUserDefinedFunc(funcId)) {
return TSDB_CODE_FAILED;
}
pFpSet->getEnv = udfAggGetEnv; pFpSet->getEnv = udfAggGetEnv;
pFpSet->init = udfAggInit; pFpSet->init = udfAggInit;
pFpSet->process = udfAggProcess; pFpSet->process = udfAggProcess;

View File

@ -232,7 +232,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
.bufLen= udf->bufSize, .bufLen= udf->bufSize,
.numOfResult = 0}; .numOfResult = 0};
udf->aggProcFunc(&input, &outBuf); udf->aggProcFunc(&input, &call->interBuf, &outBuf);
subRsp->resultBuf = outBuf; subRsp->resultBuf = outBuf;
break; break;

View File

@ -24,7 +24,7 @@ int32_t udf2_start(SUdfInterBuf *buf) {
return 0; return 0;
} }
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf; int64_t sumSquares = *(int64_t*)interBuf->buf;
for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++i) { for (int32_t j = 0; j < block->numOfRows; ++i) {
@ -35,10 +35,10 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) {
} }
} }
*(int64_t*)interBuf = sumSquares; *(int64_t*)newInterBuf = sumSquares;
interBuf->bufLen = sizeof(int64_t); newInterBuf->bufLen = sizeof(int64_t);
//TODO: if all null value, numOfResult = 0; //TODO: if all null value, numOfResult = 0;
interBuf->numOfResult = 1; newInterBuf->numOfResult = 1;
return 0; return 0;
} }