udaf integration first step
This commit is contained in:
parent
1952decc2f
commit
73a0ad7414
|
@ -206,6 +206,8 @@ typedef struct SqlFunctionCtx {
|
||||||
struct SDiskbasedBuf *pBuf;
|
struct SDiskbasedBuf *pBuf;
|
||||||
struct SSDataBlock *pSrcBlock;
|
struct SSDataBlock *pSrcBlock;
|
||||||
int32_t curBufPage;
|
int32_t curBufPage;
|
||||||
|
|
||||||
|
char* udfName[TSDB_FUNC_NAME_LEN];
|
||||||
} SqlFunctionCtx;
|
} SqlFunctionCtx;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -334,8 +336,6 @@ int32_t udfcOpen();
|
||||||
*/
|
*/
|
||||||
int32_t udfcClose();
|
int32_t udfcClose();
|
||||||
|
|
||||||
typedef void *UdfcFuncHandle;
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -119,6 +119,9 @@ typedef struct SFunctionNode {
|
||||||
int32_t funcId;
|
int32_t funcId;
|
||||||
int32_t funcType;
|
int32_t funcType;
|
||||||
SNodeList* pParameterList;
|
SNodeList* pParameterList;
|
||||||
|
|
||||||
|
int8_t udfFuncType; //TODO: fill by parser/planner
|
||||||
|
int32_t bufSize; //TODO: fill by parser/planner
|
||||||
} SFunctionNode;
|
} SFunctionNode;
|
||||||
|
|
||||||
typedef struct STableNode {
|
typedef struct STableNode {
|
||||||
|
|
|
@ -97,6 +97,8 @@ typedef struct SUdfInterBuf {
|
||||||
char* buf;
|
char* buf;
|
||||||
} SUdfInterBuf;
|
} SUdfInterBuf;
|
||||||
|
|
||||||
|
typedef void *UdfcFuncHandle;
|
||||||
|
|
||||||
// output: interBuf
|
// output: interBuf
|
||||||
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
|
int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
|
||||||
// input: block, state
|
// input: block, state
|
||||||
|
@ -118,6 +120,10 @@ int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t nu
|
||||||
*/
|
*/
|
||||||
int32_t teardownUdf(UdfcFuncHandle handle);
|
int32_t teardownUdf(UdfcFuncHandle handle);
|
||||||
|
|
||||||
|
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
|
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
|
||||||
|
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
|
||||||
|
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
|
||||||
// end API to taosd and qworker
|
// end API to taosd and qworker
|
||||||
//=============================================================================================================================
|
//=============================================================================================================================
|
||||||
// begin API to UDF writer.
|
// begin API to UDF writer.
|
||||||
|
|
|
@ -43,6 +43,9 @@ typedef struct SUdfSetupRequest {
|
||||||
|
|
||||||
typedef struct SUdfSetupResponse {
|
typedef struct SUdfSetupResponse {
|
||||||
int64_t udfHandle;
|
int64_t udfHandle;
|
||||||
|
int8_t outputType;
|
||||||
|
int32_t outputLen;
|
||||||
|
int32_t bufSize;
|
||||||
} SUdfSetupResponse;
|
} SUdfSetupResponse;
|
||||||
|
|
||||||
typedef struct SUdfCallRequest {
|
typedef struct SUdfCallRequest {
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "builtins.h"
|
#include "builtins.h"
|
||||||
#include "catalog.h"
|
#include "catalog.h"
|
||||||
|
#include "tudf.h"
|
||||||
|
|
||||||
typedef struct SFuncMgtService {
|
typedef struct SFuncMgtService {
|
||||||
SHashObj* pFuncNameHashTable;
|
SHashObj* pFuncNameHashTable;
|
||||||
|
@ -148,6 +149,14 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) {
|
||||||
|
pFpSet->getEnv = udfAggGetEnv;
|
||||||
|
pFpSet->init = udfAggInit;
|
||||||
|
pFpSet->process = udfAggProcess;
|
||||||
|
pFpSet->finalize = udfAggFinalize;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet) {
|
||||||
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
#include "tudfInt.h"
|
#include "tudfInt.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
#include "querynodes.h"
|
||||||
|
#include "builtinsimpl.h"
|
||||||
|
|
||||||
//TODO: network error processing.
|
//TODO: network error processing.
|
||||||
//TODO: add unit test
|
//TODO: add unit test
|
||||||
|
@ -147,6 +149,10 @@ typedef struct SUdfUvSession {
|
||||||
SUdfdProxy *udfc;
|
SUdfdProxy *udfc;
|
||||||
int64_t severHandle;
|
int64_t severHandle;
|
||||||
uv_pipe_t *udfSvcPipe;
|
uv_pipe_t *udfSvcPipe;
|
||||||
|
|
||||||
|
int8_t outputType;
|
||||||
|
int32_t outputLen;
|
||||||
|
int32_t bufSize;
|
||||||
} SUdfUvSession;
|
} SUdfUvSession;
|
||||||
|
|
||||||
typedef struct SClientUvTaskNode {
|
typedef struct SClientUvTaskNode {
|
||||||
|
@ -342,11 +348,17 @@ void* decodeUdfRequest(const void* buf, SUdfRequest* request) {
|
||||||
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
||||||
|
len += taosEncodeFixedI8(buf, setupRsp->outputType);
|
||||||
|
len += taosEncodeFixedI32(buf, setupRsp->outputLen);
|
||||||
|
len += taosEncodeFixedI32(buf, setupRsp->bufSize);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
|
void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) {
|
||||||
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
||||||
|
buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
|
||||||
|
buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
|
||||||
|
buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1049,6 +1061,9 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
|
|
||||||
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
||||||
task->session->severHandle = rsp->udfHandle;
|
task->session->severHandle = rsp->udfHandle;
|
||||||
|
task->session->outputType = rsp->outputType;
|
||||||
|
task->session->outputLen = rsp->outputLen;
|
||||||
|
task->session->bufSize = rsp->bufSize;
|
||||||
if (task->errCode != 0) {
|
if (task->errCode != 0) {
|
||||||
fnError("failed to setup udf. err: %d", task->errCode)
|
fnError("failed to setup udf. err: %d", task->errCode)
|
||||||
} else {
|
} else {
|
||||||
|
@ -1197,3 +1212,94 @@ int32_t teardownUdf(UdfcFuncHandle handle) {
|
||||||
|
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//memory layout |---handle----|-----result-----|---buffer----|
|
||||||
|
bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
if (pFunc->udfFuncType == TSDB_FUNC_TYPE_SCALAR) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
pEnv->calcMemSize = sizeof(int64_t*) + pFunc->node.resType.bytes + pFunc->bufSize;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) {
|
||||||
|
if (functionSetup(pCtx, pResultCellInfo) != true) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
UdfcFuncHandle handle;
|
||||||
|
if (setupUdf((char*)pCtx->udfName, &handle) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SUdfUvSession *session = (SUdfUvSession *)handle;
|
||||||
|
char *udfRes = (char*)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
||||||
|
int32_t envSize = sizeof(int64_t) + session->outputLen + session->bufSize;
|
||||||
|
memset(udfRes, 0, envSize);
|
||||||
|
*(int64_t*)(udfRes) = (int64_t)handle;
|
||||||
|
SUdfInterBuf buf = {0};
|
||||||
|
if (callUdfAggInit(handle, &buf) != 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
memcpy(udfRes + sizeof(int64_t) + session->outputLen, buf.buf, buf.bufLen);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
int32_t numOfCols = pInput->numOfInputCols;
|
||||||
|
|
||||||
|
char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
// computing based on the true data block
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
int32_t numOfRows = pInput->numOfRows;
|
||||||
|
|
||||||
|
// TODO: range [start, start+numOfRow) to generate colInfoData
|
||||||
|
|
||||||
|
|
||||||
|
UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes));
|
||||||
|
SUdfUvSession *session = (SUdfUvSession *)handle;
|
||||||
|
|
||||||
|
SSDataBlock input = {0};
|
||||||
|
input.info.numOfCols = numOfCols;
|
||||||
|
input.info.rows = numOfRows;
|
||||||
|
input.info.uid = pInput->uid;
|
||||||
|
bool hasVarCol = false;
|
||||||
|
input.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData *col = pInput->pData[i];
|
||||||
|
if (IS_VAR_DATA_TYPE(col->info.type)) {
|
||||||
|
hasVarCol = true;
|
||||||
|
}
|
||||||
|
taosArrayPush(input.pDataBlock, col);
|
||||||
|
}
|
||||||
|
|
||||||
|
input.info.hasVarCol = hasVarCol;
|
||||||
|
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, .bufLen = session->bufSize};
|
||||||
|
SUdfInterBuf newState = {0};
|
||||||
|
callUdfAggProcess(handle, &input, &state, &newState);
|
||||||
|
|
||||||
|
memcpy(state.buf, newState.buf, newState.bufLen);
|
||||||
|
|
||||||
|
taosArrayDestroy(input.pDataBlock);
|
||||||
|
|
||||||
|
taosMemoryFree(newState.buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
|
||||||
|
char* udfRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
UdfcFuncHandle handle =(UdfcFuncHandle)(*(int64_t*)(udfRes));
|
||||||
|
SUdfUvSession *session = (SUdfUvSession *)handle;
|
||||||
|
|
||||||
|
SUdfInterBuf resultBuf = {.buf = udfRes + sizeof(int64_t), .bufLen = session->outputLen};
|
||||||
|
SUdfInterBuf state = {.buf = udfRes + sizeof(int64_t) + session->outputLen, .bufLen = session->bufSize};
|
||||||
|
callUdfAggFinalize(handle, &state, &resultBuf);
|
||||||
|
|
||||||
|
functionFinalize(pCtx, pBlock);
|
||||||
|
teardownUdf(handle);
|
||||||
|
}
|
|
@ -160,6 +160,9 @@ void udfdProcessRequest(uv_work_t *req) {
|
||||||
rsp.type = request.type;
|
rsp.type = request.type;
|
||||||
rsp.code = 0;
|
rsp.code = 0;
|
||||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||||
|
rsp.setupRsp.outputType = udf->outputType;
|
||||||
|
rsp.setupRsp.outputLen = udf->outputLen;
|
||||||
|
rsp.setupRsp.bufSize = udf->bufSize;
|
||||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||||
rsp.msgLen = len;
|
rsp.msgLen = len;
|
||||||
void *bufBegin = taosMemoryMalloc(len);
|
void *bufBegin = taosMemoryMalloc(len);
|
||||||
|
|
Loading…
Reference in New Issue