diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 376f589acd..122c121ed8 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -307,11 +307,11 @@ int32_t dmStartUdfd(SDnode *pDnode) { dInfo("dnode-mgmt start udfd already called"); return 0; } + pData->startCalled = true; uv_barrier_init(&pData->barrier, 2); pData->stopping = 0; uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_barrier_wait(&pData->barrier); - pData->startCalled = true; pData->needCleanUp = true; return pData->spawnErr; } diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index b5c839c811..5f4f96c4cc 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -21,6 +21,7 @@ #include #include "tmsg.h" #include "tcommon.h" +#include "function.h" #ifdef __cplusplus extern "C" { @@ -118,8 +119,7 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData); - +int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); /** * tearn down udf * @param handle diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index e31a860e85..317339af04 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -14,6 +14,7 @@ */ #include "uv.h" #include "os.h" +#include "fnLog.h" #include "tudf.h" #include "tudfInt.h" #include "tarray.h" @@ -557,6 +558,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { return 0; } +int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) { + output->info.rows = input->numOfRows; + output->info.numOfCols = numOfCols; + bool hasVarCol = false; + for (int32_t i = 0; i < numOfCols; ++i) { + if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) { + hasVarCol = true; + break; + } + } + output->info.hasVarCol = hasVarCol; + + //TODO: free the array output->pDataBlock + output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + taosArrayPush(output->pDataBlock, input->columnData); + return 0; +} + +int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { + if (input->info.numOfCols != 1) { + fnError("scalar function only support one column"); + return -1; + } + output->numOfRows = input->info.rows; + //TODO: memory + output->columnData = taosArrayGet(input->pDataBlock, 0); + return 0; +} void onUdfcPipeClose(uv_handle_t *handle) { SClientUvConn *conn = handle->data; @@ -1108,11 +1137,13 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn return err; } -// input: block -// output: resultData -int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) { +int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; - int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); + SSDataBlock inputBlock = {0}; + convertScalarParamToDataBlock(input, numOfCols, &inputBlock); + SSDataBlock resultBlock = {0}; + int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); + convertDataBlockToScalarParm(&resultBlock, output); return err; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index d6e7a43666..5f7532da87 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -26,15 +26,15 @@ #include "trpc.h" typedef struct SUdfdContext { - uv_loop_t *loop; - uv_pipe_t ctrlPipe; + uv_loop_t *loop; + uv_pipe_t ctrlPipe; uv_signal_t intrSignal; - char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; - uv_pipe_t listeningPipe; - void *clientRpc; + char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; + uv_pipe_t listeningPipe; + void *clientRpc; uv_mutex_t udfsMutex; - SHashObj* udfsHash; + SHashObj *udfsHash; bool printVersion; } SUdfdContext; @@ -55,22 +55,17 @@ typedef struct SUvUdfWork { uv_buf_t output; } SUvUdfWork; -typedef enum { - UDF_STATE_INIT = 0, - UDF_STATE_LOADING, - UDF_STATE_READY, - UDF_STATE_UNLOADING -} EUdfState; +typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; typedef struct SUdf { - int32_t refCount; - EUdfState state; + int32_t refCount; + EUdfState state; uv_mutex_t lock; - uv_cond_t condReady; + uv_cond_t condReady; char name[16]; int8_t type; - char path[PATH_MAX]; + char path[PATH_MAX]; uv_lib_t lib; TUdfScalarProcFunc scalarProcFunc; @@ -83,24 +78,28 @@ typedef struct SUdfcFuncHandle { SUdf *udf; } SUdfcFuncHandle; -int32_t udfdLoadUdf(char* udfName, SUdf* udf) { - strcpy(udf->name, udfName); +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf); - int err = uv_dlopen(udf->path, &udf->lib); - if (err != 0) { - fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); - // TODO set error - } - //TODO: find all the functions - char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(normalFuncName, udfName); - uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); - char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *freeSuffix = "_free"; - strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); - strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); - uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); - return 0; +int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) { + strcpy(udf->name, udfName); + + udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf); + + int err = uv_dlopen(udf->path, &udf->lib); + if (err != 0) { + fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); + // TODO set error + } + // TODO: find all the functions + char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(normalFuncName, udfName); + uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); + char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *freeSuffix = "_free"; + strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); + strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); + uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); + return 0; } void udfdProcessRequest(uv_work_t *req) { @@ -110,13 +109,13 @@ void udfdProcessRequest(uv_work_t *req) { switch (request.type) { case UDF_TASK_SETUP: { - //TODO: tracable id from client. connect, setup, call, teardown - fnInfo("%"PRId64" setup request. udf name: %s", request.seqNum, request.setup.udfName); + // TODO: tracable id from client. connect, setup, call, teardown + fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName); SUdfSetupRequest *setup = &request.setup; - SUdf* udf = NULL; + SUdf *udf = NULL; uv_mutex_lock(&global.udfsMutex); - SUdf** udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); + SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); if (*udfInHash) { ++(*udfInHash)->refCount; udf = *udfInHash; @@ -136,7 +135,7 @@ void udfdProcessRequest(uv_work_t *req) { uv_mutex_lock(&udf->lock); if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; - udfdLoadUdf(setup->udfName, udf); + udfdLoadUdf(setup->udfName, &setup->epSet, udf); udf->state = UDF_STATE_READY; uv_cond_broadcast(&udf->condReady); uv_mutex_unlock(&udf->lock); @@ -168,8 +167,9 @@ void udfdProcessRequest(uv_work_t *req) { case UDF_TASK_CALL: { SUdfCallRequest *call = &request.call; - fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); - SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); + fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType, + call->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdf *udf = handle->udf; SUdfDataBlock input = {0}; @@ -206,10 +206,10 @@ void udfdProcessRequest(uv_work_t *req) { } case UDF_TASK_TEARDOWN: { SUdfTeardownRequest *teardown = &request.teardown; - fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) - SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); - SUdf *udf = handle->udf; - bool unloadUdf = false; + fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle = + (SUdfcFuncHandle *)(teardown->udfHandle); + SUdf *udf = handle->udf; + bool unloadUdf = false; uv_mutex_lock(&global.udfsMutex); udf->refCount--; if (udf->refCount == 0) { @@ -250,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) { void udfdOnWrite(uv_write_t *req, int status) { SUvUdfWork *work = (SUvUdfWork *)req->data; if (status < 0) { - //TODO:log error and process it. + // TODO:log error and process it. } fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status)); taosMemoryFree(work->output.base); @@ -393,7 +393,7 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char* udfName, SUdf* udf) { +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); @@ -505,7 +505,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { if (nread < 0) { fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); - uv_close((uv_handle_t*)q, NULL); + uv_close((uv_handle_t *)q, NULL); uv_stop(global.loop); return; } @@ -515,13 +515,13 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { static int32_t removeListeningPipe() { uv_fs_t req; - int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); uv_fs_req_cleanup(&req); return err; } static int32_t udfdUvInit() { - uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); + uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (loop) { uv_loop_init(loop); } @@ -529,10 +529,10 @@ static int32_t udfdUvInit() { uv_pipe_init(global.loop, &global.ctrlPipe, 1); uv_pipe_open(&global.ctrlPipe, 0); - uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); + uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); - char dnodeId[8] = {0}; - size_t dnodeIdSize; + char dnodeId[8] = {0}; + size_t dnodeIdSize; int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); if (err != 0) { dnodeId[0] = '1'; @@ -567,7 +567,7 @@ static int32_t udfdRun() { global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); - //TOOD: client rpc to fetch udf function info from mnode + // TOOD: client rpc to fetch udf function info from mnode if (udfdOpenClientRpc() != 0) { fnError("open rpc connection to mnode failure"); return -1; @@ -589,7 +589,7 @@ static int32_t udfdRun() { return code; } -int main(int argc, char* argv[]) { +int main(int argc, char *argv[]) { if (!taosCheckSystemIsSmallEnd()) { printf("failed to start since on non-small-end machines\n"); return -1; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index fb9c3c678a..0727a4a1d2 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -44,12 +44,15 @@ int main(int argc, char *argv[]) { } taosArrayPush(pBlock->pDataBlock, &colInfo); } - - SSDataBlock output = {0}; - callUdfScalaProcess(handle, pBlock, &output); - SColumnInfoData *col = taosArrayGet(output.pDataBlock, 0); - for (int32_t i = 0; i < output.info.rows; ++i) { + SScalarParam input = {0}; + input.numOfRows = pBlock->info.rows; + input.columnData = taosArrayGet(pBlock->pDataBlock, 0); + SScalarParam output = {0}; + callUdfScalarFunc(handle, &input, 1 , &output); + + SColumnInfoData *col = output.columnData; + for (int32_t i = 0; i < output.numOfRows; ++i) { fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t))); } teardownUdf(handle);