diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index 775e6133f7..74aab09da2 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -136,7 +136,7 @@ typedef int32_t (*TUdfTeardownFunc)(); //typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data); //typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); -typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumn* columnData); +typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* columnData); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index e3444ffa21..c5542424f0 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -318,7 +318,7 @@ int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) { void* decodeUdfRequest(const void* buf, SUdfRequest* request) { request->msgLen = *(int32_t*)(buf); - POINTER_SHIFT(buf, sizeof(request->msgLen)); + buf = POINTER_SHIFT(buf, sizeof(request->msgLen)); buf = taosDecodeFixedI64(buf, &request->seqNum); buf = taosDecodeFixedI8(buf, &request->type); @@ -429,7 +429,7 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) { void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { rsp->msgLen = *(int32_t*)(buf); - POINTER_SHIFT(buf, sizeof(rsp->msgLen)); + buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen)); buf = taosDecodeFixedI64(buf, &rsp->seqNum); buf = taosDecodeFixedI8(buf, &rsp->type); buf = taosDecodeFixedI32(buf, &rsp->code); @@ -786,9 +786,10 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN } int32_t bufLen = encodeUdfRequest(NULL, &request); request.msgLen = bufLen; - void *buf = taosMemoryMalloc(bufLen); + void *bufBegin = taosMemoryMalloc(bufLen); + void *buf = bufBegin; encodeUdfRequest(&buf, &request); - uvTask->reqBuf = uv_buf_init(buf, bufLen); + uvTask->reqBuf = uv_buf_init(bufBegin, bufLen); uvTask->seqNum = request.seqNum; } else if (uvTaskType == UV_TASK_DISCONNECT) { uvTask->pipe = task->session->udfSvcPipe; @@ -931,7 +932,7 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { gUdfcState = UDFC_STATE_RESTARTING; //TODO: asynchronous without blocking. how to do it cleanUpUvTasks(); - startUdfd(); + //startUdfd(); } } @@ -966,7 +967,7 @@ void constructUdfService(void *argsThread) { uv_loop_init(&gUdfdLoop); //TODO spawn error - startUdfd(); + //startUdfd(); uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb); uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); @@ -1009,7 +1010,8 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { udfcGetUvTaskResponseResult(task, uvTask); if (uvTaskType == UV_TASK_CONNECT) { task->session->udfSvcPipe = uvTask->pipe; - } taosMemoryFree(uvTask); + } + taosMemoryFree(uvTask); uvTask = NULL; return task->errCode; } @@ -1050,6 +1052,8 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter task->type = UDF_TASK_CALL; SUdfCallRequest *req = &task->_call.req; + req->udfHandle = task->session->severHandle; + switch (callType) { case TSDB_UDF_CALL_AGG_INIT: { req->initFirst = 1; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index d5c917121a..50f8bdbd93 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -49,7 +49,7 @@ typedef struct SUdf { uv_lib_t lib; TUdfScalarProcFunc scalarProcFunc; - TUdfFreeUdfColumnDataFunc freeUdfColumnData; + TUdfFreeUdfColumnFunc freeUdfColumn; } SUdf; //TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix @@ -72,18 +72,22 @@ void udfdProcessRequest(uv_work_t *req) { SUdfSetupRequest *setup = &request.setup; strcpy(udf->name, setup->udfName); //TODO: retrive udf info from mnode - char* path = "udf1.so"; + char* path = "libudf1.so"; int err = uv_dlopen(path, &udf->lib); if (err != 0) { debugPrint("can not load library %s. error: %s", path, uv_strerror(err)); //TODO set error } - char normalFuncName[32] = {0}; + char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(normalFuncName, setup->udfName); - //TODO error, + //TODO error, multi-thread, same udf, lock it //TODO find all functions normal, init, destroy, normal, merge, finalize uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc)); + char freeFuncName[TSDB_FUNC_NAME_LEN + 5]; + strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); + strcat(freeFuncName, "_free"); + uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); handle->udf = udf; @@ -96,10 +100,11 @@ void udfdProcessRequest(uv_work_t *req) { rsp.setupRsp.udfHandle = (int64_t) (handle); int32_t len = encodeUdfResponse(NULL, &rsp); rsp.msgLen = len; - void *buf = taosMemoryMalloc(len); + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; encodeUdfResponse(&buf, &rsp); - uvUdf->output = uv_buf_init(buf, len); + uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); break; @@ -131,12 +136,13 @@ void udfdProcessRequest(uv_work_t *req) { int32_t len = encodeUdfResponse(NULL, rsp); rsp->msgLen = len; - void *buf = taosMemoryMalloc(len); + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(buf, len); + uvUdf->output = uv_buf_init(bufBegin, len); //TODO: free - udf->freeUdfColumnData(&output); + udf->freeUdfColumn(&output); taosMemoryFree(uvUdf->input.base); break; @@ -160,12 +166,12 @@ void udfdProcessRequest(uv_work_t *req) { rsp->seqNum = request.seqNum; rsp->type = request.type; rsp->code = 0; - SUdfTeardownResponse *subRsp = &response.teardownRsp; int32_t len = encodeUdfResponse(NULL, rsp); - void *buf = taosMemoryMalloc(len); rsp->msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(buf, len); + uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); break; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 7c03ed727e..a03e006f7f 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -29,11 +29,15 @@ int main(int argc, char *argv[]) { pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); pBlock->info.numOfCols = 1; pBlock->info.rows = 4; + char data[16] = {0}; + char bitmap[1] = {0}; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info.type = TSDB_DATA_TYPE_INT; colInfo.info.bytes = sizeof(int32_t); colInfo.info.colId = 1; + colInfo.pData = data; + colInfo.nullbitmap = bitmap; for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataAppendInt32(&colInfo, j, &j); }