fix: udf can return varchar
This commit is contained in:
parent
dab5e33b9b
commit
f611a6d467
|
@ -41,7 +41,7 @@ typedef struct SUdfSetupRequest {
|
||||||
typedef struct SUdfSetupResponse {
|
typedef struct SUdfSetupResponse {
|
||||||
int64_t udfHandle;
|
int64_t udfHandle;
|
||||||
int8_t outputType;
|
int8_t outputType;
|
||||||
int32_t outputLen;
|
int32_t bytes;
|
||||||
int32_t bufSize;
|
int32_t bufSize;
|
||||||
} SUdfSetupResponse;
|
} SUdfSetupResponse;
|
||||||
|
|
||||||
|
|
|
@ -376,7 +376,7 @@ typedef struct SUdfcUvSession {
|
||||||
uv_pipe_t *udfUvPipe;
|
uv_pipe_t *udfUvPipe;
|
||||||
|
|
||||||
int8_t outputType;
|
int8_t outputType;
|
||||||
int32_t outputLen;
|
int32_t bytes;
|
||||||
int32_t bufSize;
|
int32_t bufSize;
|
||||||
|
|
||||||
char udfName[TSDB_FUNC_NAME_LEN + 1];
|
char udfName[TSDB_FUNC_NAME_LEN + 1];
|
||||||
|
@ -614,7 +614,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
|
||||||
len += taosEncodeFixedI8(buf, setupRsp->outputType);
|
len += taosEncodeFixedI8(buf, setupRsp->outputType);
|
||||||
len += taosEncodeFixedI32(buf, setupRsp->outputLen);
|
len += taosEncodeFixedI32(buf, setupRsp->bytes);
|
||||||
len += taosEncodeFixedI32(buf, setupRsp->bufSize);
|
len += taosEncodeFixedI32(buf, setupRsp->bufSize);
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
@ -622,7 +622,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
|
||||||
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
|
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
|
||||||
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
|
||||||
buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
|
buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
|
||||||
buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
|
buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
|
||||||
buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
|
buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
@ -808,6 +808,26 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
|
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
|
||||||
|
SUdfColumnMeta* meta = &udfCol->colMeta;
|
||||||
|
|
||||||
|
SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
|
||||||
|
blockDataAppendColInfo(block, &colInfoData);
|
||||||
|
blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
|
||||||
|
|
||||||
|
SColumnInfoData *col = bdGetColumnInfoData(block, 0);
|
||||||
|
for (int i = 0; i < udfCol->colData.numOfRows; ++i) {
|
||||||
|
if (udfColDataIsNull(udfCol, i)) {
|
||||||
|
colDataSetNULL(col, i);
|
||||||
|
} else {
|
||||||
|
char* data = udfColDataGetData(udfCol, i);
|
||||||
|
colDataSetVal(col, i, data, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
block->info.rows = udfCol->colData.numOfRows;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t convertUdfColumnToDataBlock2(SUdfColumn *udfCol, SSDataBlock *block) {
|
||||||
block->info.rows = udfCol->colData.numOfRows;
|
block->info.rows = udfCol->colData.numOfRows;
|
||||||
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
|
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
|
||||||
|
|
||||||
|
@ -1056,9 +1076,9 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
|
||||||
fnError("udfc scalar function calculate error. no column data");
|
fnError("udfc scalar function calculate error. no column data");
|
||||||
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
||||||
} else {
|
} else {
|
||||||
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
|
if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
|
||||||
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
|
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
|
||||||
session->outputType, session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
|
session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
|
||||||
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1086,11 +1106,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
|
||||||
}
|
}
|
||||||
SUdfcUvSession *session = (SUdfcUvSession *)handle;
|
SUdfcUvSession *session = (SUdfcUvSession *)handle;
|
||||||
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
|
||||||
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
|
int32_t envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
|
||||||
memset(udfRes, 0, envSize);
|
memset(udfRes, 0, envSize);
|
||||||
|
|
||||||
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
|
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
|
||||||
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
|
||||||
|
|
||||||
SUdfInterBuf buf = {0};
|
SUdfInterBuf buf = {0};
|
||||||
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
|
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
|
||||||
|
@ -1123,7 +1143,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||||
SUdfcUvSession *session = handle;
|
SUdfcUvSession *session = handle;
|
||||||
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
|
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
|
||||||
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
|
||||||
|
|
||||||
SInputColumnInfoData *pInput = &pCtx->input;
|
SInputColumnInfoData *pInput = &pCtx->input;
|
||||||
int32_t numOfCols = pInput->numOfInputCols;
|
int32_t numOfCols = pInput->numOfInputCols;
|
||||||
|
@ -1180,7 +1200,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
|
||||||
SUdfcUvSession *session = handle;
|
SUdfcUvSession *session = handle;
|
||||||
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
|
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
|
||||||
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
|
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
|
||||||
|
|
||||||
SUdfInterBuf resultBuf = {0};
|
SUdfInterBuf resultBuf = {0};
|
||||||
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
|
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
|
||||||
|
@ -1190,12 +1210,12 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
|
||||||
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
|
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
|
||||||
GET_RES_INFO(pCtx)->numOfRes = 0;
|
GET_RES_INFO(pCtx)->numOfRes = 0;
|
||||||
} else {
|
} else {
|
||||||
if (resultBuf.bufLen <= session->outputLen) {
|
if (resultBuf.bufLen <= session->bytes) {
|
||||||
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
|
memcpy(udfRes->finalResBuf, resultBuf.buf, session->bytes);
|
||||||
udfRes->finalResNum = resultBuf.numOfResult;
|
udfRes->finalResNum = resultBuf.numOfResult;
|
||||||
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
|
||||||
} else {
|
} else {
|
||||||
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
|
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
|
||||||
GET_RES_INFO(pCtx)->numOfRes = 0;
|
GET_RES_INFO(pCtx)->numOfRes = 0;
|
||||||
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
|
||||||
}
|
}
|
||||||
|
@ -1699,7 +1719,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
||||||
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
SUdfSetupResponse *rsp = &task->_setup.rsp;
|
||||||
task->session->severHandle = rsp->udfHandle;
|
task->session->severHandle = rsp->udfHandle;
|
||||||
task->session->outputType = rsp->outputType;
|
task->session->outputType = rsp->outputType;
|
||||||
task->session->outputLen = rsp->outputLen;
|
task->session->bytes = rsp->bytes;
|
||||||
task->session->bufSize = rsp->bufSize;
|
task->session->bufSize = rsp->bufSize;
|
||||||
strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
|
strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||||
if (task->errCode != 0) {
|
if (task->errCode != 0) {
|
||||||
|
|
|
@ -621,7 +621,11 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
rsp.code = code;
|
rsp.code = code;
|
||||||
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
rsp.setupRsp.udfHandle = (int64_t)(handle);
|
||||||
rsp.setupRsp.outputType = udf->outputType;
|
rsp.setupRsp.outputType = udf->outputType;
|
||||||
rsp.setupRsp.outputLen = udf->outputLen;
|
if (!IS_VAR_DATA_TYPE(udf->outputType)) {
|
||||||
|
rsp.setupRsp.bytes = udf->outputLen;
|
||||||
|
} else {
|
||||||
|
rsp.setupRsp.bytes = udf->outputLen + VARSTR_HEADER_SIZE;
|
||||||
|
}
|
||||||
rsp.setupRsp.bufSize = udf->bufSize;
|
rsp.setupRsp.bufSize = udf->bufSize;
|
||||||
|
|
||||||
int32_t len = encodeUdfResponse(NULL, &rsp);
|
int32_t len = encodeUdfResponse(NULL, &rsp);
|
||||||
|
@ -650,7 +654,11 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
switch (call->callType) {
|
switch (call->callType) {
|
||||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||||
SUdfColumn output = {0};
|
SUdfColumn output = {0};
|
||||||
output.colMeta.bytes = udf->outputLen;
|
if (IS_VAR_DATA_TYPE(udf->outputType)) {
|
||||||
|
output.colMeta.bytes = udf->outputLen + VARSTR_HEADER_SIZE;
|
||||||
|
} else {
|
||||||
|
output.colMeta.bytes = udf->outputLen;
|
||||||
|
}
|
||||||
output.colMeta.type = udf->outputType;
|
output.colMeta.type = udf->outputType;
|
||||||
output.colMeta.precision = 0;
|
output.colMeta.precision = 0;
|
||||||
output.colMeta.scale = 0;
|
output.colMeta.scale = 0;
|
||||||
|
|
Loading…
Reference in New Issue