call udf for scalar and aggregate

This commit is contained in:
slzhou 2022-04-17 15:07:11 +08:00
parent 4817f54ae9
commit 18943125f4
4 changed files with 135 additions and 25 deletions

View File

@ -99,19 +99,20 @@ typedef struct SUdfInterBuf {
} SUdfInterBuf;
//TODO: translate these calls to callUdf
int32_t callUdfAggInit(SUdfInterBuf *interBuf);
// input: block, initFirst
// output: interbuf
int32_t callUdfAggProcess(SSDataBlock *block, SUdfInterBuf *interBuf);
// output: interBuf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf);
// input: block, state
// output: newState
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
// input: interBuf
// output: resultData
int32_t callUdfAggFinalize(SUdfInterBuf *interBuf, SSDataBlock *resultData);
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
// input: interbuf1, interbuf2
// output: resultBuf
int32_t callUdfAggMerge(SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
// input: block
// output: resultData
int32_t callUdfScalaProcess(SSDataBlock *block, SSDataBlock *resultData);
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData);
/**
* tearn down udf
@ -134,12 +135,12 @@ typedef int32_t (*TUdfTeardownFunc)();
//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumnData* columnData);
typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumn* columnData);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumnData *resultData);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf);
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfColumnData *resultData);
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfInterBuf *resultData);
// end API to UDF writer

View File

@ -59,7 +59,7 @@ typedef struct SUdfCallRequest {
typedef struct SUdfCallResponse {
int8_t callType;
SSDataBlock resultData;
SUdfInterBuf interBuf;
SUdfInterBuf resultBuf;
} SUdfCallResponse;
@ -106,6 +106,9 @@ void freeUdfColumnData(SUdfColumnData *data);
void freeUdfColumn(SUdfColumn* col);
void freeUdfDataDataBlock(SUdfDataBlock *block);
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
#ifdef __cplusplus
}
#endif

View File

@ -18,6 +18,7 @@
#include "tudf.h"
#include "tudfInt.h"
#include "tarray.h"
#include "tdatablock.h"
//TODO: when startup, set thread poll size. add it to cfg
//TODO: test for udfd restart
@ -351,13 +352,13 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
len += tEncodeDataBlock(buf, &callRsp->resultData);
break;
case TSDB_UDF_CALL_AGG_INIT:
len += encodeUdfInterBuf(buf, &callRsp->interBuf);
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_PROC:
len += encodeUdfInterBuf(buf, &callRsp->interBuf);
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_MERGE:
len += encodeUdfInterBuf(buf, &callRsp->interBuf);
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_FIN:
len += tEncodeDataBlock(buf, &callRsp->resultData);
@ -373,13 +374,13 @@ void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
buf = tDecodeDataBlock(buf, &callRsp->resultData);
break;
case TSDB_UDF_CALL_AGG_INIT:
buf = decodeUdfInterBuf(buf, &callRsp->interBuf);
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_PROC:
buf = decodeUdfInterBuf(buf, &callRsp->interBuf);
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_MERGE:
buf = decodeUdfInterBuf(buf, &callRsp->interBuf);
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_FIN:
buf = tDecodeDataBlock(buf, &callRsp->resultData);
@ -483,6 +484,70 @@ void freeUdfInterBuf(SUdfInterBuf *buf) {
buf->buf = NULL;
}
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
udfBlock->numOfRows = block->info.rows;
udfBlock->numOfCols = block->info.numOfCols;
udfBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * udfBlock->numOfCols);
for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
udfBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn));
SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i);
SUdfColumn *udfCol = udfBlock->udfCols[i];
udfCol->colMeta.type = col->info.type;
udfCol->colMeta.bytes = col->info.bytes;
udfCol->colMeta.scale = col->info.scale;
udfCol->colMeta.precision = col->info.precision;
udfCol->colData.numOfRows = udfBlock->numOfRows;
udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
if (udfCol->colData.varLengthColumn) {
udfCol->colData.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varOffsets = taosMemoryMalloc(udfCol->colData.varOffsetsLen);
memcpy(udfCol->colData.varOffsets, col->varmeta.offset, udfCol->colData.varOffsetsLen);
udfCol->colData.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.payload = taosMemoryMalloc(udfCol->colData.payloadLen);
memcpy(udfCol->colData.payload, col->pData, udfCol->colData.payloadLen);
} else {
udfCol->colData.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
udfCol->colData.nullBitmap = taosMemoryMalloc(udfCol->colData.nullBitmapLen);
memcpy(udfCol->colData.nullBitmap, col->nullbitmap, udfCol->colData.nullBitmapLen);
udfCol->colData.dataLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.data = taosMemoryMalloc(udfCol->colData.dataLen);
memcpy(udfCol->colData.data, col->pData, udfCol->colData.dataLen);
}
}
return 0;
}
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
block->info.numOfCols = 1;
block->info.rows = udfCol->colData.numOfRows;
block->info.hasVarCol = udfCol->colData.varLengthColumn;
block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArraySetSize(block->pDataBlock, 1);
SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
SUdfColumnMeta *meta = &udfCol->colMeta;
col->info.precision = meta->precision;
col->info.bytes = meta->bytes;
col->info.scale = meta->scale;
col->info.type = meta->type;
SUdfColumnData *data = &udfCol->colData;
if (!IS_VAR_DATA_TYPE(meta->type)) {
col->nullbitmap = taosMemoryMalloc(data->nullBitmapLen);
memcpy(col->nullbitmap, data->nullBitmap, data->nullBitmapLen);
col->pData = taosMemoryMalloc(data->dataLen);
memcpy(col->pData, data->payload, data->dataLen);
} else {
col->varmeta.offset = taosMemoryMalloc(data->varOffsetsLen);
memcpy(col->varmeta.offset, data->varOffsets, data->varOffsetsLen);
col->pData = taosMemoryMalloc(data->payloadLen);
memcpy(col->pData, data->payload, data->payloadLen);
}
return 0;
}
void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data;
if (!QUEUE_EMPTY(&conn->taskQueue)) {
@ -962,7 +1027,7 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) {
return err;
}
int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) {
debugPrint("%s", "client call udf");
@ -1002,19 +1067,19 @@ int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock
SUdfCallResponse *rsp = &task->_call.rsp;
switch (callType) {
case TSDB_UDF_CALL_AGG_INIT: {
*newState = rsp->interBuf;
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_AGG_PROC: {
*newState = rsp->interBuf;
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_AGG_MERGE: {
*newState = rsp->interBuf;
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_AGG_FIN: {
*output = rsp->resultData;
*newState = rsp->resultBuf;
break;
}
case TSDB_UDF_CALL_SCALA_PROC: {
@ -1027,6 +1092,47 @@ int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock
return task->errCode;
}
//TODO: translate these calls to callUdf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_INIT;
int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
return err;
}
// input: block, state
// output: interbuf,
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
return err;
}
// input: interbuf1, interbuf2
// output: resultBuf
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
return err;
}
// input: interBuf
// output: resultData
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err;
}
// input: block
// output: resultData
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL);
return err;
}
int32_t teardownUdf(UdfHandle handle) {
debugPrint("%s", "client teardown udf");

View File

@ -112,8 +112,8 @@ void udfdProcessRequest(uv_work_t *req) {
SUdf *udf = handle->udf;
SUdfDataBlock input = {0};
//TODO: convertSDataBlockToUdfDataBlock(call->block, &input);
SUdfColumnData output;
convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfColumn output;
//TODO: call different functions according to call type, for now just calar
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
udf->scalarProcFunc(input, &output);
@ -126,7 +126,7 @@ void udfdProcessRequest(uv_work_t *req) {
rsp->type = request.type;
rsp->code = 0;
SUdfCallResponse *subRsp = &rsp->callRsp;
//TODO: convertSUdfColumnDataToSSDataBlock(output, &subRsp->resultData);
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
}
int32_t len = encodeUdfResponse(NULL, rsp);