From 946b565e992503fd538bb60af03ce4c6a95202ad Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 14 Apr 2022 19:29:41 +0800 Subject: [PATCH] sync home and office --- source/libs/function/CMakeLists.txt | 4 ++ source/libs/function/inc/tudf.h | 59 ++++++--------- source/libs/function/inc/tudfInt.h | 12 ++-- source/libs/function/src/tudf.c | 107 +++++++++++++++++----------- source/libs/function/src/udfd.c | 98 ++++++++++++------------- source/libs/function/test/udf1.c | 60 ++++++++++++---- 6 files changed, 191 insertions(+), 149 deletions(-) diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index aa909361ea..50900120a9 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -5,6 +5,7 @@ target_include_directories( function PUBLIC "${TD_SOURCE_DIR}/include/libs/function" + "${TD_SOURCE_DIR}/inlcude/util" "${TD_SOURCE_DIR}/contrib/libuv/include" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) @@ -21,6 +22,7 @@ target_include_directories( PUBLIC "${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/contrib/libuv/include" + "${TD_SOURCE_DIR}/inlcude/util" "${TD_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) @@ -35,6 +37,7 @@ target_include_directories( udf1 PUBLIC "${TD_SOURCE_DIR}/include/libs/function" + "${TD_SOURCE_DIR}/include/util" "${TD_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) @@ -46,6 +49,7 @@ target_include_directories( PUBLIC "${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/contrib/libuv/include" + "${TD_SOURCE_DIR}/inlcude/util" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index 41183d5533..86a55d296c 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -16,13 +16,17 @@ #ifndef TDENGINE_TUDF_H #define TDENGINE_TUDF_H + +#include +#include +#include "tmsg.h" + #ifdef __cplusplus extern "C" { #endif //====================================================================================== //begin API to taosd and qworker -#define TSDB_UDF_MAX_COLUMNS 4 enum { UDFC_CODE_STOPPING = -1, @@ -41,35 +45,6 @@ int32_t startUdfService(); */ int32_t stopUdfService(); -enum { - TSDB_UDF_TYPE_SCALAR = 0, - TSDB_UDF_TYPE_AGGREGATE = 1 -}; - -enum { - TSDB_UDF_SCRIPT_BIN_LIB = 0, - TSDB_UDF_SCRIPT_LUA = 1, -}; - -typedef struct SUdfColumnMeta { - int16_t type; - int32_t bytes; // <0 var length, others fixed length bytes - uint8_t precision; - uint8_t scale; -} SUdfColumnMeta; - -typedef struct SUdfInfo { - char *udfName; // function name - int32_t udfType; // scalar function or aggregate function - int8_t scriptType; - char *path; - - // known info between qworker and udf - // struct SUdfColumnMeta resultMeta; - // int32_t bufSize; //interbuf size - -} SUdfInfo; - typedef void *UdfHandle; /** @@ -78,8 +53,14 @@ typedef void *UdfHandle; * @param handle, out * @return error code */ -int32_t setupUdf(SUdfInfo* udf, UdfHandle *handle); +int32_t setupUdf(char udfName[], SEpSet epSet, UdfHandle *handle); +typedef struct SUdfColumnMeta { + int16_t type; + int32_t bytes; // <0 var length, others fixed length bytes + uint8_t precision; + uint8_t scale; +} SUdfColumnMeta; typedef struct SUdfColumnData { int32_t numOfRows; @@ -108,7 +89,7 @@ typedef struct SUdfColumn { typedef struct SUdfDataBlock { int32_t numOfRows; int32_t numOfCols; - SUdfColumn udfCols[TSDB_UDF_MAX_COLUMNS]; + SUdfColumn **udfCols; } SUdfDataBlock; typedef struct SUdfInterBuf { @@ -138,13 +119,17 @@ int32_t teardownUdf(UdfHandle handle); // begin API to UDF writer. // dynamic lib init and destroy -typedef int32_t (*TUdfInitFunc)(); -typedef int32_t (*TUdfDestroyFunc)(); +typedef int32_t (*TUdfSetupFunc)(); +typedef int32_t (*TUdfTeardownFunc)(); + +typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumnData* columnData); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumnData *resultData); -typedef int32_t (*TUdfAggInit)(SUdfInterBuf *buf); -typedef int32_t (*TUdfAggProcess)(SUdfDataBlock block, SUdfInterBuf *interBuf); -typedef int32_t (*TUdfAggFinalize)(SUdfInterBuf buf, SUdfColumnData *resultData); +typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); +typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf); +typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfColumnData *resultData); + + // end API to UDF writer //======================================================================================================================= diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 2e0b405916..d765a8d2dc 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -15,7 +15,6 @@ #ifndef TDENGINE_TUDF_INT_H #define TDENGINE_TUDF_INT_H - #ifdef __cplusplus extern "C" { #endif @@ -37,11 +36,8 @@ enum { }; typedef struct SUdfSetupRequest { - char udfName[16]; // - int8_t scriptType; // 0:c, 1: lua, 2:js - int8_t udfType; //udaf, udf - int16_t pathSize; - char *path; + char udfName[TSDB_FUNC_NAME_LEN]; + SEpSet epSet; } SUdfSetupRequest; typedef struct SUdfSetupResponse { @@ -101,6 +97,10 @@ int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request); int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse *pResponse); int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response); +void freeUdfColumnData(SUdfColumnData *data); +void freeUdfColumn(SUdfColumn* col); +void freeUdfDataDataBlock(SUdfDataBlock *block); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index d5089ca0eb..0acb7210ae 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -254,25 +254,30 @@ int32_t deserializeUdfColumnData(SUdfColumnData* pData, char* buf) { pData->nullBitmapLen = *(int32_t*)buf; buf += sizeof(int32_t); - pData->nullBitmap = buf; + //TODO: optimize for less memory copy + pData->nullBitmap = taosMemoryMalloc(pData->nullBitmapLen); + memcpy(pData->nullBitmap, buf, pData->nullBitmapLen); buf += pData->nullBitmapLen; pData->dataLen = *(int32_t*)buf; buf += sizeof(int32_t); - pData->data = buf; + pData->data = taosMemoryMalloc(pData->dataLen); + memcpy(pData->data, buf, pData->dataLen); buf += pData->dataLen; } else { pData->varOffsetsLen = *(int32_t*)buf; buf += sizeof(int32_t); - pData->varOffsets = buf; + pData->varOffsets = taosMemoryMalloc(pData->varOffsetsLen); + memcpy(pData->varOffsets, buf, pData->varOffsetsLen); buf += pData->varOffsetsLen; pData->payloadLen = *(int32_t*)buf; buf += sizeof(int32_t); - pData->payload = buf; + pData->payload = taosMemoryMalloc(pData->payloadLen); + memcpy(pData->payload, buf, pData->payloadLen); buf += pData->payloadLen; } int32_t len = buf - bufBeg; @@ -333,7 +338,7 @@ int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) { pBuf += sizeof(int32_t); for (int32_t i = 0; i < block->numOfCols; ++i) { - SUdfColumn* col = block->udfCols + i; + SUdfColumn* col = block->udfCols[i]; int32_t l = serializeUdfColumn(col, pBuf); pBuf += l; } @@ -342,7 +347,7 @@ int32_t serializeUdfDataBlock(SUdfDataBlock *block, char *pBuf) { return totalLen; } -int32_t deserailizeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) { +int32_t deserializeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) { char *bufBegin = buf; pBlock->numOfRows = *(int32_t*)buf; @@ -351,8 +356,10 @@ int32_t deserailizeUdfDataBlock(SUdfDataBlock *pBlock, char *buf) { pBlock->numOfCols = *(int32_t*)buf; buf += sizeof(int32_t); + pBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * pBlock->numOfCols); for (int32_t i = 0; i < pBlock->numOfCols; ++i) { - int32_t l = deserializeUdfColumn(pBlock->udfCols + i, buf); + pBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn)); + int32_t l = deserializeUdfColumn(pBlock->udfCols[i], buf); buf += l; } @@ -369,14 +376,16 @@ int32_t serializeUdfInterBuf(SUdfInterBuf *state, char *pBuf) { memcpy(pBuf, state->buf, state->bufLen); pBuf += state->bufLen; - return pBuf-bufBegin; + return pBuf - bufBegin; } int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) { char* bufBegin = buf; pState->bufLen = *(int32_t*)buf; buf += sizeof(int32_t); - pState->buf = buf; + + pState->buf = taosMemoryMalloc(pState->bufLen); + memcpy(pState->buf, buf, pState->bufLen); buf += pState->bufLen; return buf - bufBegin; } @@ -384,16 +393,11 @@ int32_t deserializeUdfInterBuf(SUdfInterBuf *pState, char *buf) { int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) { char *bufBegin = buf; - memcpy(buf, setup->udfName, 16); - buf += 16; - *(int8_t *) buf = setup->scriptType; - buf += sizeof(int8_t); - *(int8_t *) buf = setup->udfType; - buf += sizeof(int8_t); - *(int16_t *) buf = setup->pathSize; - buf += sizeof(int16_t); - memcpy(buf, setup->path, setup->pathSize); - buf += setup->pathSize; + memcpy(buf, setup->udfName, TSDB_FUNC_NAME_LEN); + buf += TSDB_FUNC_NAME_LEN; + + memcpy(buf, &setup->epSet, sizeof(SEpSet)); + buf += sizeof(SEpSet); return buf - bufBegin; }; @@ -401,17 +405,11 @@ int32_t serializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) { int32_t deserializeUdfSetupRequest(SUdfSetupRequest *setup, char *buf) { char* bufBegin = buf; - memcpy(setup->udfName, buf, 16); - buf += 16; - setup->scriptType = *(int8_t *) buf; - buf += sizeof(int8_t); - setup->udfType = *(int8_t *) buf; - buf += sizeof(int8_t); - setup->pathSize = *(int16_t *) buf; - buf += sizeof(int16_t); - setup->path = buf; - buf += setup->pathSize; + memcpy(setup->udfName, buf, TSDB_FUNC_NAME_LEN); + buf += TSDB_FUNC_NAME_LEN; + memcpy(&setup->epSet, buf, sizeof(SEpSet)); + buf += sizeof(SEpSet); return buf - bufBegin; } @@ -479,7 +477,7 @@ int32_t deserializeUdfCallRequest(SUdfCallRequest *call, char *buf) { call->callType = *(int8_t *) buf; buf += sizeof(int8_t); int32_t l = 0; - l = deserailizeUdfDataBlock(&call->block, buf); + l = deserializeUdfDataBlock(&call->block, buf); buf += l; l = deserializeUdfInterBuf(&call->interBuf, buf); buf += l; @@ -633,11 +631,11 @@ int32_t deserializeUdfResponse(SUdfResponse *rsp, char *buf) { int32_t estimateUdfRequestLen(SUdfRequest *request) { // a larger estimated is generated int32_t size = sizeof(SUdfRequest); - if (request->type == UDF_TASK_SETUP) { - size += request->setup.pathSize; - } else if (request->type == UDF_TASK_CALL) { + if (request->type == UDF_TASK_CALL) { + size += request->call.block.numOfCols * sizeof(SUdfColumn*); for (int32_t i = 0; i < request->call.block.numOfCols; ++i) { - SUdfColumn* col = request->call.block.udfCols + i; + size += sizeof(SUdfColumn); + SUdfColumn* col = request->call.block.udfCols[i]; if (col->colData.varLengthColumn) { size += col->colData.varOffsetsLen; size += col->colData.payloadLen; @@ -717,6 +715,39 @@ int32_t decodeResponse(char *bufMsg, int32_t bufLen, SUdfResponse *rsp) { return 0; } +void freeUdfColumnData(SUdfColumnData *data) { + if (data->varLengthColumn) { + taosMemoryFree(data->varOffsets); + data->varOffsets = NULL; + taosMemoryFree(data->payload); + data->payload = NULL; + } else { + taosMemoryFree(data->nullBitmap); + data->nullBitmap = NULL; + taosMemoryFree(data->data); + data->data = NULL; + } +} + +void freeUdfColumn(SUdfColumn* col) { + freeUdfColumnData(&col->colData); +} + +void freeUdfDataDataBlock(SUdfDataBlock *block) { + for (int32_t i = 0; i < block->numOfCols; ++i) { + freeUdfColumn(block->udfCols[i]); + taosMemoryFree(block->udfCols[i]); + block->udfCols[i] = NULL; + } + taosMemoryFree(block->udfCols); + block->udfCols = NULL; +} + +void freeUdfInterBuf(SUdfInterBuf *buf) { + taosMemoryFree(buf->buf); + buf->buf = NULL; +} + void onUdfcPipeClose(uv_handle_t *handle) { SClientUvConn *conn = handle->data; if (!QUEUE_EMPTY(&conn->taskQueue)) { @@ -1168,7 +1199,7 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { return task->errCode; } -int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { +int32_t setupUdf(char udfName[TSDB_FUNC_NAME_LEN], UdfHandle *handle) { debugPrint("%s", "client setup udf"); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); task->errCode = 0; @@ -1176,11 +1207,7 @@ int32_t setupUdf(SUdfInfo *udfInfo, UdfHandle *handle) { task->type = UDF_TASK_SETUP; SUdfSetupRequest *req = &task->_setup.req; - memcpy(req->udfName, udfInfo->udfName, 16); - req->path = udfInfo->path; - req->pathSize = strlen(req->path) + 1; - req->udfType = udfInfo->udfType; - req->scriptType = udfInfo->scriptType; + memcpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); int32_t errCode = udfcRunUvTask(task, UV_TASK_CONNECT); if (errCode != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 41995c8192..477054abe8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -44,7 +44,8 @@ typedef struct SUdf { int8_t type; uv_lib_t lib; - TUdfScalarProcFunc normalFunc; + TUdfScalarProcFunc scalarProcFunc; + TUdfFreeUdfColumnDataFunc freeUdfColumnData; } SUdf; //TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix @@ -56,19 +57,21 @@ typedef struct SUdfHandle { void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *) (req->data); - SUdfRequest *request = NULL; + SUdfRequest request = {0}; decodeRequest(uvUdf->input.base, uvUdf->input.len, &request); - switch (request->type) { + switch (request.type) { case UDF_TASK_SETUP: { debugPrint("%s", "process setup request"); SUdf *udf = taosMemoryMalloc(sizeof(SUdf)); udf->refCount = 0; - SUdfSetupRequest *setup = request->subReq; + SUdfSetupRequest *setup = &request.setup; strcpy(udf->name, setup->udfName); - int err = uv_dlopen(setup->path, &udf->lib); + //TODO: retrive udf info from mnode + char* path = "udf1.so"; + int err = uv_dlopen(path, &udf->lib); if (err != 0) { - debugPrint("can not load library %s. error: %s", setup->path, uv_strerror(err)); + debugPrint("can not load library %s. error: %s", path, uv_strerror(err)); //TODO set error } @@ -76,99 +79,86 @@ void udfdProcessRequest(uv_work_t *req) { strcpy(normalFuncName, setup->udfName); //TODO error, //TODO find all functions normal, init, destroy, normal, merge, finalize - uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->normalFunc)); + uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc)); SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); handle->udf = udf; udf->refCount++; //TODO: allocate private structure and call init function and set it to handle - SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse)); - rsp->seqNum = request->seqNum; - rsp->type = request->type; - rsp->code = 0; - SUdfSetupResponse *subRsp = taosMemoryMalloc(sizeof(SUdfSetupResponse)); - subRsp->udfHandle = (int64_t) (handle); - rsp->subRsp = subRsp; + SUdfResponse rsp; + rsp.seqNum = request.seqNum; + rsp.type = request.type; + rsp.code = 0; + rsp.setupRsp.udfHandle = (int64_t) (handle); char *buf; int32_t len; - encodeResponse(&buf, &len, rsp); + encodeResponse(&buf, &len, &rsp); uvUdf->output = uv_buf_init(buf, len); - taosMemoryFree(rsp->subRsp); - taosMemoryFree(rsp); - taosMemoryFree(request->subReq); - taosMemoryFree(request); taosMemoryFree(uvUdf->input.base); break; } case UDF_TASK_CALL: { debugPrint("%s", "process call request"); - SUdfCallRequest *call = request->subReq; + SUdfCallRequest *call = &request.call; SUdfHandle *handle = (SUdfHandle *) (call->udfHandle); SUdf *udf = handle->udf; - char *newState; - int32_t newStateSize; - SUdfDataBlock input = {.data = call->input, .size= call->inputBytes}; - SUdfDataBlock output; - //TODO: call different functions according to the step - udf->normalFunc(call->step, call->state, call->stateBytes, input, &newState, &newStateSize, &output); - SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse)); - rsp->seqNum = request->seqNum; - rsp->type = request->type; - rsp->code = 0; - SUdfCallResponse *subRsp = taosMemoryMalloc(sizeof(SUdfCallResponse)); - subRsp->outputBytes = output.size; - subRsp->output = output.data; - subRsp->newStateBytes = newStateSize; - subRsp->newState = newState; - rsp->subRsp = subRsp; + SUdfDataBlock input = call->block; + SUdfColumnData output; + //TODO: call different functions according to call type, for now just calar + if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { + udf->scalarProcFunc(input, &output); + } + + SUdfResponse response = {0}; + SUdfResponse *rsp = &response; + if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { + rsp->seqNum = request.seqNum; + rsp->type = request.type; + rsp->code = 0; + SUdfCallResponse *subRsp = &rsp->callRsp; + subRsp->resultData = output; + } char *buf; int32_t len; encodeResponse(&buf, &len, rsp); uvUdf->output = uv_buf_init(buf, len); - taosMemoryFree(rsp->subRsp); - taosMemoryFree(rsp); - taosMemoryFree(newState); - taosMemoryFree(output.data); - taosMemoryFree(request->subReq); - taosMemoryFree(request); + //TODO: free + udf->freeUdfColumnData(&output); + taosMemoryFree(uvUdf->input.base); break; } case UDF_TASK_TEARDOWN: { debugPrint("%s", "process teardown request"); - SUdfTeardownRequest *teardown = request->subReq; + SUdfTeardownRequest *teardown = &request.teardown; SUdfHandle *handle = (SUdfHandle *) (teardown->udfHandle); SUdf *udf = handle->udf; udf->refCount--; if (udf->refCount == 0) { uv_dlclose(&udf->lib); + taosMemoryFree(udf); } - taosMemoryFree(udf); - //TODO: call destroy and free udf private + //TODO: call destroy and free udf private taosMemoryFree(handle); - SUdfResponse *rsp = taosMemoryMalloc(sizeof(SUdfResponse)); - rsp->seqNum = request->seqNum; - rsp->type = request->type; + SUdfResponse response; + SUdfResponse *rsp = &response; + rsp->seqNum = request.seqNum; + rsp->type = request.type; rsp->code = 0; - SUdfTeardownResponse *subRsp = taosMemoryMalloc(sizeof(SUdfTeardownResponse)); - rsp->subRsp = subRsp; + SUdfTeardownResponse *subRsp = &response.teardownRsp; char *buf; int32_t len; encodeResponse(&buf, &len, rsp); uvUdf->output = uv_buf_init(buf, len); - taosMemoryFree(rsp->subRsp); - taosMemoryFree(rsp); - taosMemoryFree(request->subReq); - taosMemoryFree(request); taosMemoryFree(uvUdf->input.base); break; } diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 3fdc522ef9..31819fa8e5 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -3,17 +3,53 @@ #include #include "tudf.h" -void udf1(int8_t step, char *state, int32_t stateSize, SUdfDataBlock input, - char **newState, int32_t *newStateSize, SUdfDataBlock *output) { - fprintf(stdout, "%s, step:%d\n", "udf function called", step); - char *newStateBuf = malloc(stateSize); - memcpy(newStateBuf, state, stateSize); - *newState = newStateBuf; - *newStateSize = stateSize; - char *outputBuf = malloc(input.size); - memcpy(outputBuf, input.data, input.size); - output->data = outputBuf; - output->size = input.size; - return; +int32_t udf1_setup() { + return 0; } + +int32_t udf1_teardown() { + return 0; +} + +int32_t udf1(SUdfDataBlock block, SUdfColumnData *resultData) { + + resultData->numOfRows = block.numOfRows; + SUdfColumnData *srcData = &block.udfCols[0]->colData; + resultData->varLengthColumn = srcData->varLengthColumn; + + if (resultData->varLengthColumn) { + resultData->varOffsetsLen = srcData->varOffsetsLen; + resultData->varOffsets = malloc(resultData->varOffsetsLen); + memcpy(resultData->varOffsets, srcData->varOffsets, srcData->varOffsetsLen); + + resultData->payloadLen = srcData->payloadLen; + resultData->payload = malloc(resultData->payloadLen); + memcpy(resultData->payload, srcData->payload, srcData->payloadLen); + } else { + resultData->nullBitmapLen = srcData->nullBitmapLen; + resultData->nullBitmap = malloc(resultData->nullBitmapLen); + memcpy(resultData->nullBitmap, srcData->nullBitmap, srcData->nullBitmapLen); + + resultData->dataLen = srcData->dataLen; + resultData->data = malloc(resultData->dataLen); + memcpy(resultData->data, srcData->data, srcData->dataLen); + } + + return 0; +} + +int32_t udf1_free(SUdfColumnData *data) { + if (data->varLengthColumn) { + free(data->varOffsets); + data->varOffsets = NULL; + free(data->payload); + data->payload = NULL; + } else { + free(data->nullBitmap); + data->nullBitmap = NULL; + free(data->data); + data->data = NULL; + } + return 0; +} \ No newline at end of file