From f611a6d467b86a43d9fb6f54a6bb85bad733325a Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 10 Mar 2023 12:36:58 +0800 Subject: [PATCH] fix: udf can return varchar --- source/libs/function/inc/tudfInt.h | 2 +- source/libs/function/src/tudf.c | 46 +++++++++++++++++++++--------- source/libs/function/src/udfd.c | 12 ++++++-- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index c69d19b8a6..27d3b7930f 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -41,7 +41,7 @@ typedef struct SUdfSetupRequest { typedef struct SUdfSetupResponse { int64_t udfHandle; int8_t outputType; - int32_t outputLen; + int32_t bytes; int32_t bufSize; } SUdfSetupResponse; diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index fdc3f027f5..c3d00346ee 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -376,7 +376,7 @@ typedef struct SUdfcUvSession { uv_pipe_t *udfUvPipe; int8_t outputType; - int32_t outputLen; + int32_t bytes; int32_t bufSize; char udfName[TSDB_FUNC_NAME_LEN + 1]; @@ -614,7 +614,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { int32_t len = 0; len += taosEncodeFixedI64(buf, setupRsp->udfHandle); len += taosEncodeFixedI8(buf, setupRsp->outputType); - len += taosEncodeFixedI32(buf, setupRsp->outputLen); + len += taosEncodeFixedI32(buf, setupRsp->bytes); len += taosEncodeFixedI32(buf, setupRsp->bufSize); return len; } @@ -622,7 +622,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) { buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle); buf = taosDecodeFixedI8(buf, &setupRsp->outputType); - buf = taosDecodeFixedI32(buf, &setupRsp->outputLen); + buf = taosDecodeFixedI32(buf, &setupRsp->bytes); buf = taosDecodeFixedI32(buf, &setupRsp->bufSize); return (void *)buf; } @@ -808,6 +808,26 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo } int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { + SUdfColumnMeta* meta = &udfCol->colMeta; + + SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1); + blockDataAppendColInfo(block, &colInfoData); + blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + + SColumnInfoData *col = bdGetColumnInfoData(block, 0); + for (int i = 0; i < udfCol->colData.numOfRows; ++i) { + if (udfColDataIsNull(udfCol, i)) { + colDataSetNULL(col, i); + } else { + char* data = udfColDataGetData(udfCol, i); + colDataSetVal(col, i, data, false); + } + } + block->info.rows = udfCol->colData.numOfRows; + return 0; +} + +int32_t convertUdfColumnToDataBlock2(SUdfColumn *udfCol, SSDataBlock *block) { block->info.rows = udfCol->colData.numOfRows; block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); @@ -1056,9 +1076,9 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, fnError("udfc scalar function calculate error. no column data"); code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } else { - if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) { + if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) { fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", - session->outputType, session->outputLen, output->columnData->info.type, output->columnData->info.bytes); + session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes); code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } } @@ -1086,11 +1106,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult } SUdfcUvSession *session = (SUdfcUvSession *)handle; SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo); - int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; + int32_t envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize; memset(udfRes, 0, envSize); udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes; SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { @@ -1123,7 +1143,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SUdfcUvSession *session = handle; SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes; SInputColumnInfoData *pInput = &pCtx->input; int32_t numOfCols = pInput->numOfInputCols; @@ -1180,7 +1200,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { SUdfcUvSession *session = handle; SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes; SUdfInterBuf resultBuf = {0}; SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; @@ -1190,12 +1210,12 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); GET_RES_INFO(pCtx)->numOfRes = 0; } else { - if (resultBuf.bufLen <= session->outputLen) { - memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); + if (resultBuf.bufLen <= session->bytes) { + memcpy(udfRes->finalResBuf, resultBuf.buf, session->bytes); udfRes->finalResNum = resultBuf.numOfResult; GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } else { - fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen); + fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes); GET_RES_INFO(pCtx)->numOfRes = 0; udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } @@ -1699,7 +1719,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { SUdfSetupResponse *rsp = &task->_setup.rsp; task->session->severHandle = rsp->udfHandle; task->session->outputType = rsp->outputType; - task->session->outputLen = rsp->outputLen; + task->session->bytes = rsp->bytes; task->session->bufSize = rsp->bufSize; strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN); if (task->errCode != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index fcc06787e1..1593f97105 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -621,7 +621,11 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { rsp.code = code; rsp.setupRsp.udfHandle = (int64_t)(handle); rsp.setupRsp.outputType = udf->outputType; - rsp.setupRsp.outputLen = udf->outputLen; + if (!IS_VAR_DATA_TYPE(udf->outputType)) { + rsp.setupRsp.bytes = udf->outputLen; + } else { + rsp.setupRsp.bytes = udf->outputLen + VARSTR_HEADER_SIZE; + } rsp.setupRsp.bufSize = udf->bufSize; int32_t len = encodeUdfResponse(NULL, &rsp); @@ -650,7 +654,11 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { switch (call->callType) { case TSDB_UDF_CALL_SCALA_PROC: { SUdfColumn output = {0}; - output.colMeta.bytes = udf->outputLen; + if (IS_VAR_DATA_TYPE(udf->outputType)) { + output.colMeta.bytes = udf->outputLen + VARSTR_HEADER_SIZE; + } else { + output.colMeta.bytes = udf->outputLen; + } output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0;