diff --git a/include/libs/function/function.h b/include/libs/function/function.h index ef32533e5f..46e78d5c22 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -206,6 +206,8 @@ typedef struct SqlFunctionCtx { struct SDiskbasedBuf *pBuf; struct SSDataBlock *pSrcBlock; int32_t curBufPage; + + char* udfName[TSDB_FUNC_NAME_LEN]; } SqlFunctionCtx; enum { @@ -334,8 +336,6 @@ int32_t udfcOpen(); */ int32_t udfcClose(); -typedef void *UdfcFuncHandle; - #ifdef __cplusplus } #endif diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index d8e2354e8e..e68cfc4708 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -119,6 +119,9 @@ typedef struct SFunctionNode { int32_t funcId; int32_t funcType; SNodeList* pParameterList; + + int8_t udfFuncType; //TODO: fill by parser/planner + int32_t bufSize; //TODO: fill by parser/planner } SFunctionNode; typedef struct STableNode { diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index b72905b872..134ec8e2f7 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -97,6 +97,8 @@ typedef struct SUdfInterBuf { char* buf; } SUdfInterBuf; +typedef void *UdfcFuncHandle; + // output: interBuf int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); // input: block, state @@ -118,6 +120,10 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu */ int32_t teardownUdf(UdfcFuncHandle handle); +bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); +int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); +int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); // end API to taosd and qworker //============================================================================================================================= // begin API to UDF writer. diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 4e7178f7fd..8aedd59c33 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -43,6 +43,9 @@ typedef struct SUdfSetupRequest { typedef struct SUdfSetupResponse { int64_t udfHandle; + int8_t outputType; + int32_t outputLen; + int32_t bufSize; } SUdfSetupResponse; typedef struct SUdfCallRequest { diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index d44e3e251b..06b30dd123 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -21,6 +21,7 @@ #include "thash.h" #include "builtins.h" #include "catalog.h" +#include "tudf.h" typedef struct SFuncMgtService { SHashObj* pFuncNameHashTable; @@ -148,6 +149,14 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { return TSDB_CODE_SUCCESS; } +int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) { + pFpSet->getEnv = udfAggGetEnv; + pFpSet->init = udfAggInit; + pFpSet->process = udfAggProcess; + pFpSet->finalize = udfAggFinalize; + return TSDB_CODE_SUCCESS; +} + int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) { if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { return TSDB_CODE_FAILED; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index f8a7e77814..f335c6bba8 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -19,6 +19,8 @@ #include "tudfInt.h" #include "tarray.h" #include "tdatablock.h" +#include "querynodes.h" +#include "builtinsimpl.h" //TODO: network error processing. //TODO: add unit test @@ -147,6 +149,10 @@ typedef struct SUdfUvSession { SUdfdProxy *udfc; int64_t severHandle; uv_pipe_t *udfSvcPipe; + + int8_t outputType; + int32_t outputLen; + int32_t bufSize; } SUdfUvSession; typedef struct SClientUvTaskNode { @@ -342,11 +348,17 @@ void* decodeUdfRequest(const void* buf, SUdfRequest* request) { int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { int32_t len = 0; len += taosEncodeFixedI64(buf, setupRsp->udfHandle); + len += taosEncodeFixedI8(buf, setupRsp->outputType); + len += taosEncodeFixedI32(buf, setupRsp->outputLen); + len += taosEncodeFixedI32(buf, setupRsp->bufSize); return len; } void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) { buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle); + buf = taosDecodeFixedI8(buf, &setupRsp->outputType); + buf = taosDecodeFixedI32(buf, &setupRsp->outputLen); + buf = taosDecodeFixedI32(buf, &setupRsp->bufSize); return (void*)buf; } @@ -1049,6 +1061,9 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) { SUdfSetupResponse *rsp = &task->_setup.rsp; task->session->severHandle = rsp->udfHandle; + task->session->outputType = rsp->outputType; + task->session->outputLen = rsp->outputLen; + task->session->bufSize = rsp->bufSize; if (task->errCode != 0) { fnError("failed to setup udf. err: %d", task->errCode) } else { @@ -1197,3 +1212,94 @@ int32_t teardownUdf(UdfcFuncHandle handle) { return err; } + +//memory layout |---handle----|-----result-----|---buffer----| +bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + if (pFunc->udfFuncType == TSDB_FUNC_TYPE_SCALAR) { + return false; + } + pEnv->calcMemSize = sizeof(int64_t*) + pFunc->node.resType.bytes + pFunc->bufSize; + return true; +} + +bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) { + if (functionSetup(pCtx, pResultCellInfo) != true) { + return false; + } + UdfcFuncHandle handle; + if (setupUdf((char*)pCtx->udfName, &handle) != 0) { + return false; + } + SUdfUvSession *session = (SUdfUvSession *)handle; + char *udfRes = (char*)GET_ROWCELL_INTERBUF(pResultCellInfo); + int32_t envSize = sizeof(int64_t) + session->outputLen + session->bufSize; + memset(udfRes, 0, envSize); + *(int64_t*)(udfRes) = (int64_t)handle; + SUdfInterBuf buf = {0}; + if (callUdfAggInit(handle, &buf) != 0) { + return false; + } + memcpy(udfRes + sizeof(int64_t) + session->outputLen, buf.buf, buf.bufLen); + return true; +} + +int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { + + SInputColumnInfoData* pInput = &pCtx->input; + int32_t numOfCols = pInput->numOfInputCols; + + char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + // TODO: range [start, start+numOfRow) to generate colInfoData + + + UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes)); + SUdfUvSession *session = (SUdfUvSession *)handle; + + SSDataBlock input = {0}; + input.info.numOfCols = numOfCols; + input.info.rows = numOfRows; + input.info.uid = pInput->uid; + bool hasVarCol = false; + input.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData *col = pInput->pData[i]; + if (IS_VAR_DATA_TYPE(col->info.type)) { + hasVarCol = true; + } + taosArrayPush(input.pDataBlock, col); + } + + input.info.hasVarCol = hasVarCol; + SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, .bufLen = session->bufSize}; + SUdfInterBuf newState = {0}; + callUdfAggProcess(handle, &input, &state, &newState); + + memcpy(state.buf, newState.buf, newState.bufLen); + + taosArrayDestroy(input.pDataBlock); + + taosMemoryFree(newState.buf); + return 0; +} + +int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { + char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes)); + SUdfUvSession *session = (SUdfUvSession *)handle; + + SUdfInterBuf resultBuf = {.buf = udfRes + sizeof(int64_t), .bufLen = session->outputLen}; + SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, .bufLen = session->bufSize}; + callUdfAggFinalize(handle, &state, &resultBuf); + + functionFinalize(pCtx, pBlock); + teardownUdf(handle); +} \ No newline at end of file diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index e4c4cb4893..d4e7c9b825 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -160,6 +160,9 @@ void udfdProcessRequest(uv_work_t *req) { rsp.type = request.type; rsp.code = 0; rsp.setupRsp.udfHandle = (int64_t)(handle); + rsp.setupRsp.outputType = udf->outputType; + rsp.setupRsp.outputLen = udf->outputLen; + rsp.setupRsp.bufSize = udf->bufSize; int32_t len = encodeUdfResponse(NULL, &rsp); rsp.msgLen = len; void *bufBegin = taosMemoryMalloc(len);