From e56a650877fc5d495942379d22bc97e0982cef83 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sun, 22 May 2022 16:31:37 +0800 Subject: [PATCH] udf: refactor code --- source/libs/function/src/udfd.c | 440 ++++++++++++++++---------------- 1 file changed, 220 insertions(+), 220 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index abb8cefed6..964385e663 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -139,6 +139,226 @@ static int32_t udfdUvInit(); static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); +void udfdProcessRequest(uv_work_t *req) { + SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); + SUdfRequest request = {0}; + decodeUdfRequest(uvUdf->input.base, &request); + + switch (request.type) { + case UDF_TASK_SETUP: { + udfdProcessSetupRequest(uvUdf, &request); + break; + } + + case UDF_TASK_CALL: { + udfdProcessCallRequest(uvUdf, &request); + break; + } + case UDF_TASK_TEARDOWN: { + udfdProcessTeardownRequest(uvUdf, &request); + break; + } + default: { + break; + } + } +} + +void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + // TODO: tracable id from client. connect, setup, call, teardown + fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); + SUdfSetupRequest *setup = &request->setup; + int32_t code = TSDB_CODE_SUCCESS; + SUdf * udf = NULL; + uv_mutex_lock(&global.udfsMutex); + SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); + if (udfInHash) { + ++(*udfInHash)->refCount; + udf = *udfInHash; + uv_mutex_unlock(&global.udfsMutex); + } else { + SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); + udfNew->refCount = 1; + udfNew->state = UDF_STATE_INIT; + + uv_mutex_init(&udfNew->lock); + uv_cond_init(&udfNew->condReady); + udf = udfNew; + taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew)); + uv_mutex_unlock(&global.udfsMutex); + } + + uv_mutex_lock(&udf->lock); + if (udf->state == UDF_STATE_INIT) { + udf->state = UDF_STATE_LOADING; + code = udfdLoadUdf(setup->udfName, udf); + if (udf->initFunc) { + udf->initFunc(); + } + udf->state = UDF_STATE_READY; + uv_cond_broadcast(&udf->condReady); + uv_mutex_unlock(&udf->lock); + } else { + while (udf->state != UDF_STATE_READY) { + uv_cond_wait(&udf->condReady, &udf->lock); + } + uv_mutex_unlock(&udf->lock); + } + SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); + handle->udf = udf; + + SUdfResponse rsp; + rsp.seqNum = request->seqNum; + rsp.type = request->type; + rsp.code = code; + rsp.setupRsp.udfHandle = (int64_t)(handle); + rsp.setupRsp.outputType = udf->outputType; + rsp.setupRsp.outputLen = udf->outputLen; + rsp.setupRsp.bufSize = udf->bufSize; + + int32_t len = encodeUdfResponse(NULL, &rsp); + rsp.msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, &rsp); + + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + +void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + SUdfCallRequest *call = &request->call; + fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle); + SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle); + SUdf * udf = handle->udf; + SUdfResponse response = {0}; + SUdfResponse * rsp = &response; + SUdfCallResponse *subRsp = &rsp->callRsp; + + int32_t code = TSDB_CODE_SUCCESS; + switch (call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: { + SUdfColumn output = {0}; + + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + code = udf->scalarProcFunc(&input, &output); + freeUdfDataDataBlock(&input); + convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + freeUdfColumn(&output); + break; + } + case TSDB_UDF_CALL_AGG_INIT: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + udf->aggStartFunc(&outBuf); + subRsp->resultBuf = outBuf; + break; + } + case TSDB_UDF_CALL_AGG_PROC: { + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); + freeUdfInterBuf(&call->interBuf); + freeUdfDataDataBlock(&input); + subRsp->resultBuf = outBuf; + + break; + } + case TSDB_UDF_CALL_AGG_FIN: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + code = udf->aggFinishFunc(&call->interBuf, &outBuf); + freeUdfInterBuf(&call->interBuf); + subRsp->resultBuf = outBuf; + break; + } + default: + break; + } + + rsp->seqNum = request->seqNum; + rsp->type = request->type; + rsp->code = code; + subRsp->callType = call->callType; + + int32_t len = encodeUdfResponse(NULL, rsp); + rsp->msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, rsp); + uvUdf->output = uv_buf_init(bufBegin, len); + + switch (call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: { + tDeleteSSDataBlock(&call->block); + tDeleteSSDataBlock(&subRsp->resultData); + break; + } + case TSDB_UDF_CALL_AGG_INIT: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + case TSDB_UDF_CALL_AGG_PROC: { + tDeleteSSDataBlock(&call->block); + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + case TSDB_UDF_CALL_AGG_FIN: { + freeUdfInterBuf(&subRsp->resultBuf); + break; + } + default: + break; + } + + taosMemoryFree(uvUdf->input.base); + return; +} + +void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + SUdfTeardownRequest *teardown = &request->teardown; + fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); + SUdf * udf = handle->udf; + bool unloadUdf = false; + int32_t code = TSDB_CODE_SUCCESS; + + uv_mutex_lock(&global.udfsMutex); + udf->refCount--; + if (udf->refCount == 0) { + unloadUdf = true; + taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); + } + uv_mutex_unlock(&global.udfsMutex); + if (unloadUdf) { + uv_cond_destroy(&udf->condReady); + uv_mutex_destroy(&udf->lock); + if (udf->destroyFunc) { + (udf->destroyFunc)(); + } + uv_dlclose(&udf->lib); + taosMemoryFree(udf); + } + taosMemoryFree(handle); + + SUdfResponse response; + SUdfResponse *rsp = &response; + rsp->seqNum = request->seqNum; + rsp->type = request->type; + rsp->code = code; + int32_t len = encodeUdfResponse(NULL, rsp); + rsp->msgLen = len; + void *bufBegin = taosMemoryMalloc(len); + void *buf = bufBegin; + encodeUdfResponse(&buf, rsp); + uvUdf->output = uv_buf_init(bufBegin, len); + + taosMemoryFree(uvUdf->input.base); + return; +} + void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; ASSERT(pMsg->info.ahandle != NULL); @@ -388,226 +608,6 @@ int32_t udfdCloseClientRpc() { return 0; } -void udfdProcessRequest(uv_work_t *req) { - SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); - SUdfRequest request = {0}; - decodeUdfRequest(uvUdf->input.base, &request); - - switch (request.type) { - case UDF_TASK_SETUP: { - udfdProcessSetupRequest(uvUdf, &request); - break; - } - - case UDF_TASK_CALL: { - udfdProcessCallRequest(uvUdf, &request); - break; - } - case UDF_TASK_TEARDOWN: { - udfdProcessTeardownRequest(uvUdf, &request); - break; - } - default: { - break; - } - } -} - -void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { - // TODO: tracable id from client. connect, setup, call, teardown - fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); - SUdfSetupRequest *setup = &request->setup; - int32_t code = TSDB_CODE_SUCCESS; - SUdf * udf = NULL; - uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName)); - if (udfInHash) { - ++(*udfInHash)->refCount; - udf = *udfInHash; - uv_mutex_unlock(&global.udfsMutex); - } else { - SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); - udfNew->refCount = 1; - udfNew->state = UDF_STATE_INIT; - - uv_mutex_init(&udfNew->lock); - uv_cond_init(&udfNew->condReady); - udf = udfNew; - taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew)); - uv_mutex_unlock(&global.udfsMutex); - } - - uv_mutex_lock(&udf->lock); - if (udf->state == UDF_STATE_INIT) { - udf->state = UDF_STATE_LOADING; - code = udfdLoadUdf(setup->udfName, udf); - if (udf->initFunc) { - udf->initFunc(); - } - udf->state = UDF_STATE_READY; - uv_cond_broadcast(&udf->condReady); - uv_mutex_unlock(&udf->lock); - } else { - while (udf->state != UDF_STATE_READY) { - uv_cond_wait(&udf->condReady, &udf->lock); - } - uv_mutex_unlock(&udf->lock); - } - SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); - handle->udf = udf; - - SUdfResponse rsp; - rsp.seqNum = request->seqNum; - rsp.type = request->type; - rsp.code = code; - rsp.setupRsp.udfHandle = (int64_t)(handle); - rsp.setupRsp.outputType = udf->outputType; - rsp.setupRsp.outputLen = udf->outputLen; - rsp.setupRsp.bufSize = udf->bufSize; - - int32_t len = encodeUdfResponse(NULL, &rsp); - rsp.msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, &rsp); - - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); - return; -} - -void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { - SUdfCallRequest *call = &request->call; - fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle); - SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle); - SUdf * udf = handle->udf; - SUdfResponse response = {0}; - SUdfResponse * rsp = &response; - SUdfCallResponse *subRsp = &rsp->callRsp; - - int32_t code = TSDB_CODE_SUCCESS; - switch (call->callType) { - case TSDB_UDF_CALL_SCALA_PROC: { - SUdfColumn output = {0}; - - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - code = udf->scalarProcFunc(&input, &output); - freeUdfDataDataBlock(&input); - convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); - freeUdfColumn(&output); - break; - } - case TSDB_UDF_CALL_AGG_INIT: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - udf->aggStartFunc(&outBuf); - subRsp->resultBuf = outBuf; - break; - } - case TSDB_UDF_CALL_AGG_PROC: { - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggProcFunc(&input, &call->interBuf, &outBuf); - freeUdfInterBuf(&call->interBuf); - freeUdfDataDataBlock(&input); - subRsp->resultBuf = outBuf; - - break; - } - case TSDB_UDF_CALL_AGG_FIN: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->aggFinishFunc(&call->interBuf, &outBuf); - freeUdfInterBuf(&call->interBuf); - subRsp->resultBuf = outBuf; - break; - } - default: - break; - } - - rsp->seqNum = request->seqNum; - rsp->type = request->type; - rsp->code = code; - subRsp->callType = call->callType; - - int32_t len = encodeUdfResponse(NULL, rsp); - rsp->msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(bufBegin, len); - - switch (call->callType) { - case TSDB_UDF_CALL_SCALA_PROC: { - tDeleteSSDataBlock(&call->block); - tDeleteSSDataBlock(&subRsp->resultData); - break; - } - case TSDB_UDF_CALL_AGG_INIT: { - freeUdfInterBuf(&subRsp->resultBuf); - break; - } - case TSDB_UDF_CALL_AGG_PROC: { - tDeleteSSDataBlock(&call->block); - freeUdfInterBuf(&subRsp->resultBuf); - break; - } - case TSDB_UDF_CALL_AGG_FIN: { - freeUdfInterBuf(&subRsp->resultBuf); - break; - } - default: - break; - } - - taosMemoryFree(uvUdf->input.base); - return; -} - -void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { - SUdfTeardownRequest *teardown = &request->teardown; - fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle); - SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); - SUdf * udf = handle->udf; - bool unloadUdf = false; - int32_t code = TSDB_CODE_SUCCESS; - - uv_mutex_lock(&global.udfsMutex); - udf->refCount--; - if (udf->refCount == 0) { - unloadUdf = true; - taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); - } - uv_mutex_unlock(&global.udfsMutex); - if (unloadUdf) { - uv_cond_destroy(&udf->condReady); - uv_mutex_destroy(&udf->lock); - if (udf->destroyFunc) { - (udf->destroyFunc)(); - } - uv_dlclose(&udf->lib); - taosMemoryFree(udf); - } - taosMemoryFree(handle); - - SUdfResponse response; - SUdfResponse *rsp = &response; - rsp->seqNum = request->seqNum; - rsp->type = request->type; - rsp->code = code; - int32_t len = encodeUdfResponse(NULL, rsp); - rsp->msgLen = len; - void *bufBegin = taosMemoryMalloc(len); - void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); - uvUdf->output = uv_buf_init(bufBegin, len); - - taosMemoryFree(uvUdf->input.base); - return; -} - void udfdOnWrite(uv_write_t *req, int status) { SUvUdfWork *work = (SUvUdfWork *)req->data; if (status < 0) {