Merge pull request #11897 from taosdata/feature/udf
feature(udf): aggregate function
This commit is contained in:
commit
d5ae5fc667
|
@ -206,6 +206,8 @@ typedef struct SqlFunctionCtx {
|
|||
struct SDiskbasedBuf *pBuf;
|
||||
struct SSDataBlock *pSrcBlock;
|
||||
int32_t curBufPage;
|
||||
|
||||
char* udfName[TSDB_FUNC_NAME_LEN];
|
||||
} SqlFunctionCtx;
|
||||
|
||||
enum {
|
||||
|
@ -334,8 +336,6 @@ int32_t udfcOpen();
|
|||
*/
|
||||
int32_t udfcClose();
|
||||
|
||||
typedef void *UdfcFuncHandle;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -42,8 +42,7 @@ enum {
|
|||
UDFC_CODE_INVALID_STATE = -5
|
||||
};
|
||||
|
||||
|
||||
|
||||
typedef void *UdfcFuncHandle;
|
||||
|
||||
/**
|
||||
* setup udf
|
||||
|
@ -95,6 +94,7 @@ typedef struct SUdfDataBlock {
|
|||
typedef struct SUdfInterBuf {
|
||||
int32_t bufLen;
|
||||
char* buf;
|
||||
int8_t numOfResult; //zero or one
|
||||
} SUdfInterBuf;
|
||||
|
||||
// output: interBuf
|
||||
|
@ -118,6 +118,10 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
|
|||
*/
|
||||
int32_t teardownUdf(UdfcFuncHandle handle);
|
||||
|
||||
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
|
||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||
// end API to taosd and qworker
|
||||
//=============================================================================================================================
|
||||
// begin API to UDF writer.
|
||||
|
@ -133,11 +137,11 @@ typedef int32_t (*TUdfTeardownFunc)();
|
|||
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
|
||||
|
||||
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
|
||||
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
|
||||
|
||||
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol);
|
||||
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
|
||||
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf);
|
||||
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfInterBuf *resultData);
|
||||
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
|
||||
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf);
|
||||
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
|
||||
|
||||
|
||||
// end API to UDF writer
|
|
@ -6,5 +6,5 @@ target_include_directories(
|
|||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
target_link_libraries(
|
||||
sdb os common util
|
||||
sdb os common util wal
|
||||
)
|
|
@ -16,6 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "sdbInt.h"
|
||||
#include "tchecksum.h"
|
||||
#include "wal.h"
|
||||
|
||||
#define SDB_TABLE_SIZE 24
|
||||
#define SDB_RESERVE_SIZE 512
|
||||
|
@ -137,7 +138,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
|||
int32_t readLen = 0;
|
||||
int64_t ret = 0;
|
||||
|
||||
SSdbRaw *pRaw = taosMemoryMalloc(SDB_MAX_SIZE);
|
||||
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("failed read file since %s", terrstr());
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "qndInt.h"
|
||||
#include "query.h"
|
||||
#include "qworker.h"
|
||||
//#include "tudf.h"
|
||||
#include "libs/function/function.h"
|
||||
|
||||
SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||
SQnode *pQnode = taosMemoryCalloc(1, sizeof(SQnode));
|
||||
|
@ -26,7 +26,9 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
//udfcOpen();
|
||||
if (udfcOpen() != 0) {
|
||||
qError("qnode can not open udfc");
|
||||
}
|
||||
|
||||
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
|
||||
taosMemoryFreeClear(pQnode);
|
||||
|
@ -40,7 +42,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
|||
void qndClose(SQnode *pQnode) {
|
||||
qWorkerDestroy((void **)&pQnode->pQuery);
|
||||
|
||||
//udfcClose();
|
||||
udfcClose();
|
||||
|
||||
taosMemoryFree(pQnode);
|
||||
}
|
||||
|
|
|
@ -51,6 +51,21 @@ target_link_libraries(
|
|||
udf1 PUBLIC os
|
||||
)
|
||||
|
||||
add_library(udf2 MODULE test/udf2.c)
|
||||
target_include_directories(
|
||||
udf2
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/function"
|
||||
"${TD_SOURCE_DIR}/include/util"
|
||||
"${TD_SOURCE_DIR}/include/common"
|
||||
"${TD_SOURCE_DIR}/include/client"
|
||||
"${TD_SOURCE_DIR}/include/os"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
target_link_libraries(
|
||||
udf2 PUBLIC os
|
||||
)
|
||||
|
||||
#SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
|
||||
add_executable(udfd src/udfd.c)
|
||||
target_include_directories(
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
|
||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
||||
|
||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
|
|
@ -43,6 +43,9 @@ typedef struct SUdfSetupRequest {
|
|||
|
||||
typedef struct SUdfSetupResponse {
|
||||
int64_t udfHandle;
|
||||
int8_t outputType;
|
||||
int32_t outputLen;
|
||||
int32_t bufSize;
|
||||
} SUdfSetupResponse;
|
||||
|
||||
typedef struct SUdfCallRequest {
|
||||
|
|
|
@ -139,6 +139,20 @@ int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
|
||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
|
||||
cleanupResultRowEntry(pResInfo);
|
||||
|
||||
char* in = finalResult;
|
||||
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
|
||||
|
||||
return pResInfo->numOfRes;
|
||||
}
|
||||
|
||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
|
||||
|
|
|
@ -21,6 +21,10 @@
|
|||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
#include "thash.h"
|
||||
#include "builtins.h"
|
||||
#include "catalog.h"
|
||||
#include "tudf.h"
|
||||
|
||||
|
||||
typedef struct SFuncMgtService {
|
||||
SHashObj* pFuncNameHashTable;
|
||||
|
@ -120,6 +124,14 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) {
|
||||
pFpSet->getEnv = udfAggGetEnv;
|
||||
pFpSet->init = udfAggInit;
|
||||
pFpSet->process = udfAggProcess;
|
||||
pFpSet->finalize = udfAggFinalize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
||||
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
#include "tudfInt.h"
|
||||
#include "tarray.h"
|
||||
#include "tdatablock.h"
|
||||
#include "querynodes.h"
|
||||
#include "builtinsimpl.h"
|
||||
#include "functionMgt.h"
|
||||
|
||||
//TODO: network error processing.
|
||||
//TODO: add unit test
|
||||
|
@ -147,6 +150,10 @@ typedef struct SUdfUvSession {
|
|||
SUdfdProxy *udfc;
|
||||
int64_t severHandle;
|
||||
uv_pipe_t *udfSvcPipe;
|
||||
|
||||
int8_t outputType;
|
||||
int32_t outputLen;
|
||||
int32_t bufSize;
|
||||
} SUdfUvSession;
|
||||
|
||||
typedef struct SClientUvTaskNode {
|
||||
|
@ -235,12 +242,14 @@ void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) {
|
|||
|
||||
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeFixedI8(buf, state->numOfResult);
|
||||
len += taosEncodeFixedI32(buf, state->bufLen);
|
||||
len += taosEncodeBinary(buf, state->buf, state->bufLen);
|
||||
return len;
|
||||
}
|
||||
|
||||
void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) {
|
||||
buf = taosDecodeFixedI8(buf, &state->numOfResult);
|
||||
buf = taosDecodeFixedI32(buf, &state->bufLen);
|
||||
buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen);
|
||||
return (void*)buf;
|
||||
|
@ -342,11 +351,17 @@ void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
|
|||
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
||||
int32_t len = 0;
|
||||
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
||||
len += taosEncodeFixedI8(buf, setupRsp->outputType);
|
||||
len += taosEncodeFixedI32(buf, setupRsp->outputLen);
|
||||
len += taosEncodeFixedI32(buf, setupRsp->bufSize);
|
||||
return len;
|
||||
}
|
||||
|
||||
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
|
||||
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
||||
buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
|
||||
buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
|
||||
buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
|
||||
return (void*)buf;
|
||||
}
|
||||
|
||||
|
@ -1049,6 +1064,9 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
|||
|
||||
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
||||
task->session->severHandle = rsp->udfHandle;
|
||||
task->session->outputType = rsp->outputType;
|
||||
task->session->outputLen = rsp->outputLen;
|
||||
task->session->bufSize = rsp->bufSize;
|
||||
if (task->errCode != 0) {
|
||||
fnError("failed to setup udf. err: %d", task->errCode)
|
||||
} else {
|
||||
|
@ -1197,3 +1215,122 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
|
|||
|
||||
return err;
|
||||
}
|
||||
|
||||
//memory layout |---SUdfAggRes----|-----final result-----|---inter result----|
|
||||
typedef struct SUdfAggRes {
|
||||
SUdfUvSession *session;
|
||||
int8_t finalResNum;
|
||||
int8_t interResNum;
|
||||
char* finalResBuf;
|
||||
char* interResBuf;
|
||||
} SUdfAggRes;
|
||||
|
||||
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
if (fmIsScalarFunc(pFunc->funcId)) {
|
||||
return false;
|
||||
}
|
||||
pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
|
||||
if (functionSetup(pCtx, pResultCellInfo) != true) {
|
||||
return false;
|
||||
}
|
||||
UdfcFuncHandle handle;
|
||||
if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
|
||||
return false;
|
||||
}
|
||||
SUdfUvSession *session = (SUdfUvSession *)handle;
|
||||
SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
|
||||
memset(udfRes, 0, envSize);
|
||||
|
||||
udfRes->session = (SUdfUvSession *)handle;
|
||||
SUdfInterBuf buf = {0};
|
||||
if (callUdfAggInit(handle, &buf) != 0) {
|
||||
return false;
|
||||
}
|
||||
udfRes->interResNum = buf.numOfResult;
|
||||
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
int32_t numOfCols = pInput->numOfInputCols;
|
||||
|
||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
SUdfUvSession *session = udfRes->session;
|
||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
|
||||
SSDataBlock tempBlock = {0};
|
||||
tempBlock.info.numOfCols = numOfCols;
|
||||
tempBlock.info.rows = numOfRows;
|
||||
tempBlock.info.uid = pInput->uid;
|
||||
bool hasVarCol = false;
|
||||
tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData *col = pInput->pData[i];
|
||||
if (IS_VAR_DATA_TYPE(col->info.type)) {
|
||||
hasVarCol = true;
|
||||
}
|
||||
taosArrayPush(tempBlock.pDataBlock, col);
|
||||
}
|
||||
tempBlock.info.hasVarCol = hasVarCol;
|
||||
|
||||
SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows);
|
||||
|
||||
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||
.bufLen = session->bufSize,
|
||||
.numOfResult = udfRes->interResNum};
|
||||
SUdfInterBuf newState = {0};
|
||||
|
||||
callUdfAggProcess(session, inputBlock, &state, &newState);
|
||||
|
||||
udfRes->interResNum = newState.numOfResult;
|
||||
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
|
||||
|
||||
if (newState.numOfResult == 1 || state.numOfResult == 1) {
|
||||
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||
}
|
||||
|
||||
blockDataDestroy(inputBlock);
|
||||
|
||||
taosArrayDestroy(tempBlock.pDataBlock);
|
||||
|
||||
taosMemoryFree(newState.buf);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||
SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
SUdfUvSession *session = udfRes->session;
|
||||
udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes);
|
||||
udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
||||
|
||||
|
||||
SUdfInterBuf resultBuf = {.buf = udfRes->finalResBuf,
|
||||
.bufLen = session->outputLen,
|
||||
.numOfResult = udfRes->finalResNum};
|
||||
SUdfInterBuf state = {.buf = udfRes->interResBuf,
|
||||
.bufLen = session->bufSize,
|
||||
.numOfResult = udfRes->interResNum};
|
||||
callUdfAggFinalize(session, &state, &resultBuf);
|
||||
teardownUdf(session);
|
||||
|
||||
if (resultBuf.numOfResult == 1) {
|
||||
GET_RES_INFO(pCtx)->numOfRes = 1;
|
||||
}
|
||||
return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
|
||||
}
|
|
@ -77,6 +77,10 @@ typedef struct SUdf {
|
|||
uv_lib_t lib;
|
||||
TUdfScalarProcFunc scalarProcFunc;
|
||||
TUdfFreeUdfColumnFunc freeUdfColumn;
|
||||
|
||||
TUdfAggStartFunc aggStartFunc;
|
||||
TUdfAggProcessFunc aggProcFunc;
|
||||
TUdfAggFinishFunc aggFinishFunc;
|
||||
} SUdf;
|
||||
|
||||
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
||||
|
@ -97,15 +101,32 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
|||
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||
return UDFC_CODE_LOAD_UDF_FAILURE;
|
||||
}
|
||||
// TODO: find all the functions
|
||||
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||
strcpy(normalFuncName, udfName);
|
||||
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
|
||||
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
||||
char *freeSuffix = "_free";
|
||||
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
|
||||
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
||||
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
||||
//TODO: init and destroy function
|
||||
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
|
||||
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||
strcpy(processFuncName, udfName);
|
||||
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
|
||||
char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
|
||||
char *freeSuffix = "_free";
|
||||
strncpy(freeFuncName, processFuncName, strlen(processFuncName));
|
||||
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
||||
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
||||
} else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
|
||||
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||
strcpy(processFuncName, udfName);
|
||||
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc));
|
||||
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
||||
char *startSuffix = "_start";
|
||||
strncpy(startFuncName, processFuncName, strlen(processFuncName));
|
||||
strncat(startFuncName, startSuffix, strlen(startSuffix));
|
||||
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
|
||||
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
|
||||
char *finishSuffix = "_finish";
|
||||
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
|
||||
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
|
||||
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc));
|
||||
//TODO: merge
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -160,6 +181,9 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
rsp.type = request.type;
|
||||
rsp.code = 0;
|
||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||
rsp.setupRsp.outputType = udf->outputType;
|
||||
rsp.setupRsp.outputLen = udf->outputLen;
|
||||
rsp.setupRsp.bufSize = udf->bufSize;
|
||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||
rsp.msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
|
@ -178,26 +202,58 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
call->udfHandle);
|
||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||
SUdf *udf = handle->udf;
|
||||
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
SUdfColumn output = {0};
|
||||
// TODO: call different functions according to call type, for now just calar
|
||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
||||
udf->scalarProcFunc(input, &output);
|
||||
}
|
||||
|
||||
SUdfResponse response = {0};
|
||||
SUdfResponse *rsp = &response;
|
||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
||||
rsp->seqNum = request.seqNum;
|
||||
rsp->type = request.type;
|
||||
rsp->code = 0;
|
||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||
subRsp->callType = call->callType;
|
||||
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
|
||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||
|
||||
switch(call->callType) {
|
||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||
SUdfColumn output = {0};
|
||||
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
udf->scalarProcFunc(&input, &output);
|
||||
|
||||
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||
udf->freeUdfColumn(&output);
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_INIT: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
udf->aggStartFunc(&outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_PROC: {
|
||||
SUdfDataBlock input = {0};
|
||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
udf->aggProcFunc(&input, &outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
|
||||
break;
|
||||
}
|
||||
case TSDB_UDF_CALL_AGG_FIN: {
|
||||
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||
.bufLen= udf->bufSize,
|
||||
.numOfResult = 0};
|
||||
udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
rsp->seqNum = request.seqNum;
|
||||
rsp->type = request.type;
|
||||
rsp->code = 0;
|
||||
subRsp->callType = call->callType;
|
||||
|
||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||
rsp->msgLen = len;
|
||||
void *bufBegin = taosMemoryMalloc(len);
|
||||
|
@ -205,9 +261,6 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
encodeUdfResponse(&buf, rsp);
|
||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||
|
||||
// TODO: free udf column
|
||||
udf->freeUdfColumn(&output);
|
||||
|
||||
taosMemoryFree(uvUdf->input.base);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -9,18 +9,18 @@
|
|||
#undef free
|
||||
#define free free
|
||||
|
||||
int32_t udf1_setup() {
|
||||
int32_t udf1_init() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udf1_teardown() {
|
||||
int32_t udf1_destroy() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udf1(SUdfDataBlock block, SUdfColumn *resultCol) {
|
||||
int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
|
||||
SUdfColumnData *resultData = &resultCol->colData;
|
||||
resultData->numOfRows = block.numOfRows;
|
||||
SUdfColumnData *srcData = &block.udfCols[0]->colData;
|
||||
resultData->numOfRows = block->numOfRows;
|
||||
SUdfColumnData *srcData = &block->udfCols[0]->colData;
|
||||
resultData->varLengthColumn = srcData->varLengthColumn;
|
||||
|
||||
if (resultData->varLengthColumn) {
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
|
||||
#include "tudf.h"
|
||||
|
||||
#undef malloc
|
||||
#define malloc malloc
|
||||
#undef free
|
||||
#define free free
|
||||
|
||||
int32_t udf2_init() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udf2_destroy() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udf2_start(SUdfInterBuf *buf) {
|
||||
*(int64_t*)(buf->buf) = 0;
|
||||
buf->bufLen = sizeof(int64_t);
|
||||
buf->numOfResult = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) {
|
||||
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
for (int32_t j = 0; j < block->numOfRows; ++i) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
//TODO: check the bitmap for null value
|
||||
int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
|
||||
sumSquares += rows[j] * rows[j];
|
||||
}
|
||||
}
|
||||
|
||||
*(int64_t*)interBuf = sumSquares;
|
||||
interBuf->bufLen = sizeof(int64_t);
|
||||
//TODO: if all null value, numOfResult = 0;
|
||||
interBuf->numOfResult = 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
|
||||
//TODO: check numOfResults;
|
||||
int64_t sumSquares = *(int64_t*)(buf->buf);
|
||||
*(double*)(resultData->buf) = sqrt(sumSquares);
|
||||
resultData->bufLen = sizeof(double);
|
||||
resultData->numOfResult = 1;
|
||||
return 0;
|
||||
}
|
|
@ -7,6 +7,7 @@
|
|||
#include "tcommon.h"
|
||||
#include "tdatablock.h"
|
||||
#include "scalar.h"
|
||||
#include "tudf.h"
|
||||
|
||||
int32_t scalarGetOperatorParamNum(EOperatorType type) {
|
||||
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type
|
||||
|
@ -336,14 +337,12 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
|
|||
SCL_ERR_RET(sclInitParamList(¶ms, node->pParameterList, ctx, ¶mNum, &rowNum));
|
||||
|
||||
if (fmIsUserDefinedFunc(node->funcId)) {
|
||||
#if 0
|
||||
UdfcFuncHandle udfHandle = NULL;
|
||||
|
||||
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
|
||||
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
|
||||
teardownUdf(udfHandle);
|
||||
SCL_ERR_JRET(code);
|
||||
#endif
|
||||
} else {
|
||||
SScalarFuncExecFuncs ffpSet = {0};
|
||||
code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);
|
||||
|
|
Loading…
Reference in New Issue