aggregate function call from udfd
This commit is contained in:
parent
974c679c84
commit
0119054b0b
|
@ -139,9 +139,9 @@ typedef int32_t (*TUdfTeardownFunc)();
|
||||||
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
|
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
|
||||||
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
|
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
|
||||||
|
|
||||||
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
|
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
|
||||||
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf);
|
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf);
|
||||||
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
|
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
|
||||||
|
|
||||||
|
|
||||||
// end API to UDF writer
|
// end API to UDF writer
|
||||||
|
|
|
@ -77,6 +77,10 @@ typedef struct SUdf {
|
||||||
uv_lib_t lib;
|
uv_lib_t lib;
|
||||||
TUdfScalarProcFunc scalarProcFunc;
|
TUdfScalarProcFunc scalarProcFunc;
|
||||||
TUdfFreeUdfColumnFunc freeUdfColumn;
|
TUdfFreeUdfColumnFunc freeUdfColumn;
|
||||||
|
|
||||||
|
TUdfAggStartFunc aggStartFunc;
|
||||||
|
TUdfAggProcessFunc aggProcFunc;
|
||||||
|
TUdfAggFinishFunc aggFinishFunc;
|
||||||
} SUdf;
|
} SUdf;
|
||||||
|
|
||||||
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
|
||||||
|
@ -97,15 +101,32 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||||
return UDFC_CODE_LOAD_UDF_FAILURE;
|
return UDFC_CODE_LOAD_UDF_FAILURE;
|
||||||
}
|
}
|
||||||
// TODO: find all the functions
|
//TODO: init and destroy function
|
||||||
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
|
||||||
strcpy(normalFuncName, udfName);
|
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
|
strcpy(processFuncName, udfName);
|
||||||
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
|
||||||
|
char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
|
||||||
char *freeSuffix = "_free";
|
char *freeSuffix = "_free";
|
||||||
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
|
strncpy(freeFuncName, processFuncName, strlen(processFuncName));
|
||||||
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
|
||||||
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
|
||||||
|
} else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
|
||||||
|
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||||
|
strcpy(processFuncName, udfName);
|
||||||
|
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc));
|
||||||
|
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
||||||
|
char *startSuffix = "_start";
|
||||||
|
strncpy(startFuncName, processFuncName, strlen(processFuncName));
|
||||||
|
strncat(startFuncName, startSuffix, strlen(startSuffix));
|
||||||
|
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
|
||||||
|
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
|
||||||
|
char *finishSuffix = "_finish";
|
||||||
|
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
|
||||||
|
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
|
||||||
|
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc));
|
||||||
|
//TODO: merge
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,25 +202,57 @@ void udfdProcessRequest(uv_work_t *req) {
|
||||||
call->udfHandle);
|
call->udfHandle);
|
||||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
|
||||||
SUdf *udf = handle->udf;
|
SUdf *udf = handle->udf;
|
||||||
|
SUdfResponse response = {0};
|
||||||
|
SUdfResponse *rsp = &response;
|
||||||
|
SUdfCallResponse *subRsp = &rsp->callRsp;
|
||||||
|
|
||||||
|
switch(call->callType) {
|
||||||
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||||
|
SUdfColumn output = {0};
|
||||||
|
|
||||||
SUdfDataBlock input = {0};
|
SUdfDataBlock input = {0};
|
||||||
convertDataBlockToUdfDataBlock(&call->block, &input);
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||||
SUdfColumn output = {0};
|
|
||||||
// TODO: call different functions according to call type, for now just calar
|
|
||||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
|
||||||
udf->scalarProcFunc(&input, &output);
|
udf->scalarProcFunc(&input, &output);
|
||||||
|
|
||||||
|
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||||
|
udf->freeUdfColumn(&output);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_AGG_INIT: {
|
||||||
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||||
|
.bufLen= udf->bufSize,
|
||||||
|
.numOfResult = 0};
|
||||||
|
udf->aggStartFunc(&outBuf);
|
||||||
|
subRsp->resultBuf = outBuf;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_AGG_PROC: {
|
||||||
|
SUdfDataBlock input = {0};
|
||||||
|
convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||||
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||||
|
.bufLen= udf->bufSize,
|
||||||
|
.numOfResult = 0};
|
||||||
|
udf->aggProcFunc(&input, &outBuf);
|
||||||
|
subRsp->resultBuf = outBuf;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_UDF_CALL_AGG_FIN: {
|
||||||
|
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize),
|
||||||
|
.bufLen= udf->bufSize,
|
||||||
|
.numOfResult = 0};
|
||||||
|
udf->aggFinishFunc(&call->interBuf, &outBuf);
|
||||||
|
subRsp->resultBuf = outBuf;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUdfResponse response = {0};
|
|
||||||
SUdfResponse *rsp = &response;
|
|
||||||
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
|
|
||||||
rsp->seqNum = request.seqNum;
|
rsp->seqNum = request.seqNum;
|
||||||
rsp->type = request.type;
|
rsp->type = request.type;
|
||||||
rsp->code = 0;
|
rsp->code = 0;
|
||||||
SUdfCallResponse *subRsp = &rsp->callRsp;
|
|
||||||
subRsp->callType = call->callType;
|
subRsp->callType = call->callType;
|
||||||
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t len = encodeUdfResponse(NULL, rsp);
|
int32_t len = encodeUdfResponse(NULL, rsp);
|
||||||
rsp->msgLen = len;
|
rsp->msgLen = len;
|
||||||
|
@ -208,9 +261,6 @@ void udfdProcessRequest(uv_work_t *req) {
|
||||||
encodeUdfResponse(&buf, rsp);
|
encodeUdfResponse(&buf, rsp);
|
||||||
uvUdf->output = uv_buf_init(bufBegin, len);
|
uvUdf->output = uv_buf_init(bufBegin, len);
|
||||||
|
|
||||||
// TODO: free udf column
|
|
||||||
udf->freeUdfColumn(&output);
|
|
||||||
|
|
||||||
taosMemoryFree(uvUdf->input.base);
|
taosMemoryFree(uvUdf->input.base);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue