diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 5f20d2e50a..d5dc967fed 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -24,7 +24,6 @@ #include "builtinsimpl.h" #include "functionMgt.h" -//TODO: add unit test typedef struct SUdfdData { bool startCalled; bool needCleanUp; @@ -45,7 +44,15 @@ typedef struct SUdfdData { SUdfdData udfdGlobal = {0}; +int32_t udfStartUdfd(int32_t startDnodeId); +int32_t udfStopUdfd(); + static int32_t udfSpawnUdfd(SUdfdData *pData); +void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal); +static int32_t udfSpawnUdfd(SUdfdData* pData); +static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg); +static void udfUdfdStopAsyncCb(uv_async_t *async); +static void udfWatchUdfd(void *args); void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); @@ -413,6 +420,34 @@ enum { UDFC_STATE_STOPPING, // stopping after udfcClose }; +int32_t getUdfdPipeName(char* pipeName, int32_t size); +int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup); +void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request); +int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state); +void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state); +int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call); +void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call); +int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown); +void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown); +int32_t encodeUdfRequest(void** buf, const SUdfRequest* request); +void* decodeUdfRequest(const void* buf, SUdfRequest* request); +int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp); +void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp); +int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp); +void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp); +int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp); +void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse); +int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp); +void* decodeUdfResponse(const void* buf, SUdfResponse* rsp); +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta); +void freeUdfColumn(SUdfColumn* col); +void freeUdfDataDataBlock(SUdfDataBlock *block); +void freeUdfInterBuf(SUdfInterBuf *buf); +int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); +int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); +int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output); +int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output); + int32_t getUdfdPipeName(char* pipeName, int32_t size) { char dnodeId[8] = {0}; size_t dnodeIdSize = sizeof(dnodeId); @@ -650,7 +685,7 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) { len += encodeUdfTeardownResponse(buf, &rsp->teardownRsp); break; default: - //TODO: log error + fnError("encode udf response, invalid udf response type %d", rsp->type); break; } return len; @@ -676,7 +711,7 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp); break; default: - //TODO: log error + fnError("decode udf response, invalid udf response type %d", rsp->type); break; } return (void*)buf; @@ -817,6 +852,319 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { return 0; } +////////////////////////////////////////////////////////////////////////////////////////////////////////////// +//memory layout |---SUdfAggRes----|-----final result-----|---inter result----| +typedef struct SUdfAggRes { + int8_t finalResNum; + int8_t interResNum; + char* finalResBuf; + char* interResBuf; +} SUdfAggRes; +void onUdfcPipeClose(uv_handle_t *handle); +int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask); +void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf); +bool isUdfcUvMsgComplete(SClientConnBuf *connBuf); +void udfcUvHandleRsp(SClientUvConn *conn); +void udfcUvHandleError(SClientUvConn *conn); +void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); +void onUdfcPipetWrite(uv_write_t *write, int status); +void onUdfcPipeConnect(uv_connect_t *connect, int status); +int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask); +int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask); +int32_t udfcStartUvTask(SClientUvTaskNode *uvTask); +void udfcAsyncTaskCb(uv_async_t *async); +void cleanUpUvTasks(SUdfcProxy *udfc); +void udfStopAsyncCb(uv_async_t *async); +void constructUdfService(void *argsThread); +int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType); +int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle); +int compareUdfcFuncSub(const void* elem1, const void* elem2); +int32_t doTeardownUdf(UdfcFuncHandle handle); + +int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, + SSDataBlock* output, SUdfInterBuf *newState); +int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); +int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); +int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); +int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); +int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output); +int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); + +int32_t udfcOpen(); +int32_t udfcClose(); + +int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle); +void releaseUdfFuncHandle(char* udfName); +int32_t cleanUpUdfs(); + +bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); +int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); +int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); + +int compareUdfcFuncSub(const void* elem1, const void* elem2) { + SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; + SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; + return strcmp(stub1->udfName, stub2->udfName); +} + +int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { + int32_t code = 0; + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (stubIndex != -1) { + SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex); + UdfcFuncHandle handle = foundStub->handle; + if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + *pHandle = foundStub->handle; + ++foundStub->refCount; + foundStub->lastRefTime = taosGetTimestampUs(); + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return 0; + } else { + fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + udfName, foundStub->refCount, foundStub->lastRefTime); + taosArrayRemove(gUdfdProxy.udfStubs, stubIndex); + } + } + *pHandle = NULL; + code = doSetupUdf(udfName, pHandle); + if (code == TSDB_CODE_SUCCESS) { + SUdfcFuncStub stub = {0}; + strcpy(stub.udfName, udfName); + stub.handle = *pHandle; + ++stub.refCount; + stub.lastRefTime = taosGetTimestampUs(); + taosArrayPush(gUdfdProxy.udfStubs, &stub); + taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); + } else { + *pHandle = NULL; + } + + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return code; +} + +void releaseUdfFuncHandle(char* udfName) { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + SUdfcFuncStub key = {0}; + strcpy(key.udfName, udfName); + SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (!foundStub) { + return; + } + if (foundStub->refCount > 0) { + --foundStub->refCount; + } + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); +} + +int32_t cleanUpUdfs() { + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + int32_t i = 0; + SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", + stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + taosArrayPush(udfStubs, stub); + } else { + fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + stub->udfName, stub->refCount, stub->lastRefTime); + } + } + ++i; + } + taosArrayDestroy(gUdfdProxy.udfStubs); + gUdfdProxy.udfStubs = udfStubs; + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); + return 0; +} + +int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { + UdfcFuncHandle handle = NULL; + int32_t code = acquireUdfFuncHandle(udfName, &handle); + if (code != 0) { + return code; + } + SUdfcUvSession *session = handle; + code = doCallUdfScalarFunc(handle, input, numOfCols, output); + if (output->columnData == NULL) { + fnError("udfc scalar function calculate error. no column data"); + code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; + } else { + if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) { + fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType, + session->outputLen, output->columnData->info.type, output->columnData->info.bytes); + code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; + } + } + releaseUdfFuncHandle(udfName); + return code; +} + +bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + if (fmIsScalarFunc(pFunc->funcId)) { + return false; + } + pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize; + return true; +} + +bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) { + if (functionSetup(pCtx, pResultCellInfo) != true) { + return false; + } + UdfcFuncHandle handle; + int32_t udfCode = 0; + if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { + fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); + return false; + } + SUdfcUvSession *session = (SUdfcUvSession *)handle; + SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); + int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; + memset(udfRes, 0, envSize); + + udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; + + SUdfInterBuf buf = {0}; + if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { + fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); + releaseUdfFuncHandle(pCtx->udfName); + return false; + } + udfRes->interResNum = buf.numOfResult; + if (buf.bufLen <= session->bufSize) { + memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); + } else { + fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); + releaseUdfFuncHandle(pCtx->udfName); + return false; + } + releaseUdfFuncHandle(pCtx->udfName); + freeUdfInterBuf(&buf); + return true; +} + +int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { + int32_t udfCode = 0; + UdfcFuncHandle handle = 0; + if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { + fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); + return udfCode; + } + + SUdfcUvSession *session = handle; + SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; + + SInputColumnInfoData* pInput = &pCtx->input; + int32_t numOfCols = pInput->numOfInputCols; + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + + SSDataBlock tempBlock = {0}; + tempBlock.info.numOfCols = numOfCols; + tempBlock.info.rows = pInput->totalRows; + tempBlock.info.uid = pInput->uid; + bool hasVarCol = false; + tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData *col = pInput->pData[i]; + if (IS_VAR_DATA_TYPE(col->info.type)) { + hasVarCol = true; + } + taosArrayPush(tempBlock.pDataBlock, col); + } + tempBlock.info.hasVarCol = hasVarCol; + + SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows); + + SUdfInterBuf state = {.buf = udfRes->interResBuf, + .bufLen = session->bufSize, + .numOfResult = udfRes->interResNum}; + SUdfInterBuf newState = {0}; + + udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); + if (udfCode != 0) { + fnError("udfAggProcess error. code: %d", udfCode); + newState.numOfResult = 0; + } else { + udfRes->interResNum = newState.numOfResult; + if (newState.bufLen <= session->bufSize) { + memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); + } else { + fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize); + udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE; + } + } + if (newState.numOfResult == 1 || state.numOfResult == 1) { + GET_RES_INFO(pCtx)->numOfRes = 1; + } + + blockDataDestroy(inputBlock); + taosArrayDestroy(tempBlock.pDataBlock); + + releaseUdfFuncHandle(pCtx->udfName); + freeUdfInterBuf(&newState); + return udfCode; +} + +int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { + int32_t udfCode = 0; + UdfcFuncHandle handle = 0; + if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { + fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); + return udfCode; + } + + SUdfcUvSession *session = handle; + SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; + + + SUdfInterBuf resultBuf = {0}; + SUdfInterBuf state = {.buf = udfRes->interResBuf, + .bufLen = session->bufSize, + .numOfResult = udfRes->interResNum}; + int32_t udfCallCode= 0; + udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf); + if (udfCallCode != 0) { + fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); + GET_RES_INFO(pCtx)->numOfRes = 0; + } else { + if (resultBuf.bufLen <= session->outputLen) { + memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); + udfRes->finalResNum = resultBuf.numOfResult; + GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; + } else { + fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen); + GET_RES_INFO(pCtx)->numOfRes = 0; + udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; + } + } + + freeUdfInterBuf(&resultBuf); + + int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); + releaseUdfFuncHandle(pCtx->udfName); + return udfCallCode == 0 ? numOfResults : udfCallCode; +} + void onUdfcPipeClose(uv_handle_t *handle) { SClientUvConn *conn = handle->data; if (!QUEUE_EMPTY(&conn->taskQueue)) { @@ -843,18 +1191,15 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * switch (task->type) { case UDF_TASK_SETUP: { - //TODO: copy or not task->_setup.rsp = rsp.setupRsp; break; } case UDF_TASK_CALL: { task->_call.rsp = rsp.callRsp; - //TODO: copy or not break; } case UDF_TASK_TEARDOWN: { task->_teardown.rsp = rsp.teardownRsp; - //TODO: copy or not? break; } default: { @@ -1050,7 +1395,7 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN request.teardown = task->_teardown.req; request.type = UDF_TASK_TEARDOWN; } else { - //TODO log and return error + fnError("udfc create uv task, invalid task type : %d", task->type); } int32_t bufLen = encodeUdfRequest(NULL, &request); request.msgLen = bufLen; @@ -1314,93 +1659,6 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { return err; } -int compareUdfcFuncSub(const void* elem1, const void* elem2) { - SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; - SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; - return strcmp(stub1->udfName, stub2->udfName); -} - -int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { - int32_t code = 0; - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - SUdfcFuncStub key = {0}; - strcpy(key.udfName, udfName); - int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - if (stubIndex != -1) { - SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex); - UdfcFuncHandle handle = foundStub->handle; - if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { - *pHandle = foundStub->handle; - ++foundStub->refCount; - foundStub->lastRefTime = taosGetTimestampUs(); - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - return 0; - } else { - fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", - udfName, foundStub->refCount, foundStub->lastRefTime); - taosArrayRemove(gUdfdProxy.udfStubs, stubIndex); - } - } - *pHandle = NULL; - code = doSetupUdf(udfName, pHandle); - if (code == TSDB_CODE_SUCCESS) { - SUdfcFuncStub stub = {0}; - strcpy(stub.udfName, udfName); - stub.handle = *pHandle; - ++stub.refCount; - stub.lastRefTime = taosGetTimestampUs(); - taosArrayPush(gUdfdProxy.udfStubs, &stub); - taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); - } else { - *pHandle = NULL; - } - - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - return code; -} - -void releaseUdfFuncHandle(char* udfName) { - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - SUdfcFuncStub key = {0}; - strcpy(key.udfName, udfName); - SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - if (!foundStub) { - return; - } - if (foundStub->refCount > 0) { - --foundStub->refCount; - } - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); -} - -int32_t cleanUpUdfs() { - uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - int32_t i = 0; - SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); - if (stub->refCount == 0) { - fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); - } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", - stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); - UdfcFuncHandle handle = stub->handle; - if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { - taosArrayPush(udfStubs, stub); - } else { - fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", - stub->udfName, stub->refCount, stub->lastRefTime); - } - } - ++i; - } - taosArrayDestroy(gUdfdProxy.udfStubs); - gUdfdProxy.udfStubs = udfStubs; - uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); - return 0; -} - int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, SSDataBlock* output, SUdfInterBuf *newState) { fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle); @@ -1524,29 +1782,6 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t return err; } - -int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { - UdfcFuncHandle handle = NULL; - int32_t code = acquireUdfFuncHandle(udfName, &handle); - if (code != 0) { - return code; - } - SUdfcUvSession *session = handle; - code = doCallUdfScalarFunc(handle, input, numOfCols, output); - if (output->columnData == NULL) { - fnError("udfc scalar function calculate error. no column data"); - code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; - } else { - if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) { - fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType, - session->outputLen, output->columnData->info.type, output->columnData->info.bytes); - code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; - } - } - releaseUdfFuncHandle(udfName); - return code; -} - int32_t doTeardownUdf(UdfcFuncHandle handle) { SUdfcUvSession *session = (SUdfcUvSession *) handle; @@ -1576,165 +1811,3 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { return err; } - -//memory layout |---SUdfAggRes----|-----final result-----|---inter result----| -typedef struct SUdfAggRes { - int8_t finalResNum; - int8_t interResNum; - char* finalResBuf; - char* interResBuf; -} SUdfAggRes; - -bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - if (fmIsScalarFunc(pFunc->funcId)) { - return false; - } - pEnv->calcMemSize = sizeof(SUdfAggRes) + pFunc->node.resType.bytes + pFunc->udfBufSize; - return true; -} - -bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) { - if (functionSetup(pCtx, pResultCellInfo) != true) { - return false; - } - UdfcFuncHandle handle; - int32_t udfCode = 0; - if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { - fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); - return false; - } - SUdfcUvSession *session = (SUdfcUvSession *)handle; - SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); - int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; - memset(udfRes, 0, envSize); - - udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - - SUdfInterBuf buf = {0}; - if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { - fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); - releaseUdfFuncHandle(pCtx->udfName); - return false; - } - udfRes->interResNum = buf.numOfResult; - if (buf.bufLen <= session->bufSize) { - memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); - } else { - fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); - releaseUdfFuncHandle(pCtx->udfName); - return false; - } - releaseUdfFuncHandle(pCtx->udfName); - freeUdfInterBuf(&buf); - return true; -} - -int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { - int32_t udfCode = 0; - UdfcFuncHandle handle = 0; - if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { - fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); - return udfCode; - } - - SUdfcUvSession *session = handle; - SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - - SInputColumnInfoData* pInput = &pCtx->input; - int32_t numOfCols = pInput->numOfInputCols; - int32_t start = pInput->startRowIndex; - int32_t numOfRows = pInput->numOfRows; - - - SSDataBlock tempBlock = {0}; - tempBlock.info.numOfCols = numOfCols; - tempBlock.info.rows = pInput->totalRows; - tempBlock.info.uid = pInput->uid; - bool hasVarCol = false; - tempBlock.pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData *col = pInput->pData[i]; - if (IS_VAR_DATA_TYPE(col->info.type)) { - hasVarCol = true; - } - taosArrayPush(tempBlock.pDataBlock, col); - } - tempBlock.info.hasVarCol = hasVarCol; - - SSDataBlock *inputBlock = blockDataExtractBlock(&tempBlock, start, numOfRows); - - SUdfInterBuf state = {.buf = udfRes->interResBuf, - .bufLen = session->bufSize, - .numOfResult = udfRes->interResNum}; - SUdfInterBuf newState = {0}; - - udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); - if (udfCode != 0) { - fnError("udfAggProcess error. code: %d", udfCode); - newState.numOfResult = 0; - } else { - udfRes->interResNum = newState.numOfResult; - if (newState.bufLen <= session->bufSize) { - memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); - } else { - fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize); - udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE; - } - } - if (newState.numOfResult == 1 || state.numOfResult == 1) { - GET_RES_INFO(pCtx)->numOfRes = 1; - } - - blockDataDestroy(inputBlock); - taosArrayDestroy(tempBlock.pDataBlock); - - releaseUdfFuncHandle(pCtx->udfName); - freeUdfInterBuf(&newState); - return udfCode; -} - -int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { - int32_t udfCode = 0; - UdfcFuncHandle handle = 0; - if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { - fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); - return udfCode; - } - - SUdfcUvSession *session = handle; - SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - - - SUdfInterBuf resultBuf = {0}; - SUdfInterBuf state = {.buf = udfRes->interResBuf, - .bufLen = session->bufSize, - .numOfResult = udfRes->interResNum}; - int32_t udfCallCode= 0; - udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf); - if (udfCallCode != 0) { - fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); - GET_RES_INFO(pCtx)->numOfRes = 0; - } else { - if (resultBuf.bufLen <= session->outputLen) { - memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); - udfRes->finalResNum = resultBuf.numOfResult; - GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; - } else { - fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen); - GET_RES_INFO(pCtx)->numOfRes = 0; - udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; - } - } - - freeUdfInterBuf(&resultBuf); - - int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); - releaseUdfFuncHandle(pCtx->udfName); - return udfCallCode == 0 ? numOfResults : udfCallCode; -} diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index e1ebfa8254..fa80858253 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -103,177 +103,65 @@ typedef struct SUdfdRpcSendRecvInfo { uv_sem_t resultSem; } SUdfdRpcSendRecvInfo; -void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; - ASSERT(pMsg->info.ahandle != NULL); +static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); +static int32_t udfdConnectToMnode(); +static int32_t udfdLoadUdf(char *udfName, SUdf *udf); +static bool udfdRpcRfp(int32_t code); +static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); +static int32_t udfdOpenClientRpc(); +static int32_t udfdCloseClientRpc(); - if (pEpSet) { - if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { - updateEpSet_s(&global.mgmtEp, pEpSet); - } - } +static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request); +static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request); +static void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request); +static void udfdProcessRequest(uv_work_t *req); +static void udfdOnWrite(uv_write_t *req, int status); +static void udfdSendResponse(uv_work_t *work, int status); +static void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf); +static bool isUdfdUvMsgComplete(SUdfdUvConn *pipe); +static void udfdHandleRequest(SUdfdUvConn *conn); +static void udfdPipeCloseCb(uv_handle_t *pipe); +static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); } +static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); +static void udfdOnNewConnection(uv_stream_t *server, int status); - if (pMsg->code != TSDB_CODE_SUCCESS) { - fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); - msgInfo->code = pMsg->code; - goto _return; - } +static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); +static int32_t removeListeningPipe(); - if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { - SConnectRsp connectRsp = {0}; - tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - if (connectRsp.epSet.numOfEps == 0) { - msgInfo->code = TSDB_CODE_MND_APP_ERROR; - goto _return; +static void udfdPrintVersion(); +static int32_t udfdParseArgs(int32_t argc, char *argv[]); +static int32_t udfdInitLog(); + +static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); +static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); +static int32_t udfdUvInit(); +static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); +static int32_t udfdRun(); + +void udfdProcessRequest(uv_work_t *req) { + SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); + SUdfRequest request = {0}; + decodeUdfRequest(uvUdf->input.base, &request); + + switch (request.type) { + case UDF_TASK_SETUP: { + udfdProcessSetupRequest(uvUdf, &request); + break; } - if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { - updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); + case UDF_TASK_CALL: { + udfdProcessCallRequest(uvUdf, &request); + break; + } + case UDF_TASK_TEARDOWN: { + udfdProcessTeardownRequest(uvUdf, &request); + break; + } + default: { + break; } - msgInfo->code = 0; - } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { - SRetrieveFuncRsp retrieveRsp = {0}; - tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); - - SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf * udf = msgInfo->param; - udf->funcType = pFuncInfo->funcType; - udf->scriptType = pFuncInfo->scriptType; - udf->outputType = pFuncInfo->outputType; - udf->outputLen = pFuncInfo->outputLen; - udf->bufSize = pFuncInfo->bufSize; - - char path[PATH_MAX] = {0}; - snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name); - TdFilePtr file = - taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); - // TODO check for failure of flush to disk - taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); - taosCloseFile(&file); - strncpy(udf->path, path, strlen(path)); - tFreeSFuncInfo(pFuncInfo); - taosArrayDestroy(retrieveRsp.pFuncInfos); - msgInfo->code = 0; } - -_return: - rpcFreeCont(pMsg->pCont); - uv_sem_post(&msgInfo->resultSem); - return; -} - -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { - SRetrieveFuncReq retrieveReq = {0}; - retrieveReq.numOfFuncs = 1; - retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, udfName); - - int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); - void * pReq = rpcMallocCont(contLen); - tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); - taosArrayDestroy(retrieveReq.pFuncNames); - - SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); - msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; - msgInfo->param = udf; - uv_sem_init(&msgInfo->resultSem, 0); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; - rpcMsg.info.ahandle = msgInfo; - rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - uv_sem_destroy(&msgInfo->resultSem); - int32_t code = msgInfo->code; - taosMemoryFree(msgInfo); - return code; -} - -int32_t udfdConnectToMnode() { - SConnectReq connReq = {0}; - connReq.connType = CONN_TYPE__UDFD; - tstrncpy(connReq.app, "udfd", sizeof(connReq.app)); - tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); - char pass[TSDB_PASSWORD_LEN + 1] = {0}; - taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); - tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); - connReq.pid = htonl(taosGetPId()); - connReq.startTime = htobe64(taosGetTimestampMs()); - - int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); - void * pReq = rpcMallocCont(contLen); - tSerializeSConnectReq(pReq, contLen, &connReq); - - SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); - msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; - uv_sem_init(&msgInfo->resultSem, 0); - - SRpcMsg rpcMsg = {0}; - rpcMsg.msgType = TDMT_MND_CONNECT; - rpcMsg.pCont = pReq; - rpcMsg.contLen = contLen; - rpcMsg.info.ahandle = msgInfo; - rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - int32_t code = msgInfo->code; - uv_sem_destroy(&msgInfo->resultSem); - taosMemoryFree(msgInfo); - return code; -} - -int32_t udfdLoadUdf(char *udfName, SUdf *udf) { - strcpy(udf->name, udfName); - int32_t err = 0; - - err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf); - if (err != 0) { - fnError("can not retrieve udf from mnode. udf name %s", udfName); - return TSDB_CODE_UDF_LOAD_UDF_FAILURE; - } - - err = uv_dlopen(udf->path, &udf->lib); - if (err != 0) { - fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); - return TSDB_CODE_UDF_LOAD_UDF_FAILURE; - } - - char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *initSuffix = "_init"; - strcpy(initFuncName, udfName); - strncat(initFuncName, initSuffix, strlen(initSuffix)); - uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc)); - - char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *destroySuffix = "_destroy"; - strcpy(destroyFuncName, udfName); - strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); - uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc)); - - if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { - char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(processFuncName, udfName); - uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); - } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { - char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(processFuncName, udfName); - uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc)); - char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *startSuffix = "_start"; - strncpy(startFuncName, processFuncName, strlen(processFuncName)); - strncat(startFuncName, startSuffix, strlen(startSuffix)); - uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc)); - char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; - char *finishSuffix = "_finish"; - strncpy(finishFuncName, processFuncName, strlen(processFuncName)); - strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); - uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); - // TODO: merge - } - return 0; } void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { @@ -471,173 +359,181 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { return; } -void udfdProcessRequest(uv_work_t *req) { - SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); - SUdfRequest request = {0}; - decodeUdfRequest(uvUdf->input.base, &request); +void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; + ASSERT(pMsg->info.ahandle != NULL); - switch (request.type) { - case UDF_TASK_SETUP: { - udfdProcessSetupRequest(uvUdf, &request); - break; - } - - case UDF_TASK_CALL: { - udfdProcessCallRequest(uvUdf, &request); - break; - } - case UDF_TASK_TEARDOWN: { - udfdProcessTeardownRequest(uvUdf, &request); - break; - } - default: { - break; + if (pEpSet) { + if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) { + updateEpSet_s(&global.mgmtEp, pEpSet); } } -} -void udfdOnWrite(uv_write_t *req, int status) { - SUvUdfWork *work = (SUvUdfWork *)req->data; - if (status < 0) { - fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); + if (pMsg->code != TSDB_CODE_SUCCESS) { + fnError("udfd rpc error. code: %s", tstrerror(pMsg->code)); + msgInfo->code = pMsg->code; + goto _return; } - taosMemoryFree(work->output.base); - taosMemoryFree(work); - taosMemoryFree(req); -} -void udfdSendResponse(uv_work_t *work, int status) { - SUvUdfWork *udfWork = (SUvUdfWork *)(work->data); - - uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); - write_req->data = udfWork; - uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite); - - taosMemoryFree(work); -} - -void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { - SUdfdUvConn *ctx = handle->data; - int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); - if (ctx->inputCap == 0) { - ctx->inputBuf = taosMemoryMalloc(msgHeadSize); - if (ctx->inputBuf) { - ctx->inputLen = 0; - ctx->inputCap = msgHeadSize; - ctx->inputTotal = -1; - - buf->base = ctx->inputBuf; - buf->len = ctx->inputCap; - } else { - // TODO: log error - buf->base = NULL; - buf->len = 0; + if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + if (connectRsp.epSet.numOfEps == 0) { + msgInfo->code = TSDB_CODE_MND_APP_ERROR; + goto _return; } - } else { - ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap; - void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap); - if (inputBuf) { - ctx->inputBuf = inputBuf; - buf->base = ctx->inputBuf + ctx->inputLen; - buf->len = ctx->inputCap - ctx->inputLen; - } else { - // TODO: log error - buf->base = NULL; - buf->len = 0; + + if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) { + updateEpSet_s(&global.mgmtEp, &connectRsp.epSet); } - } - fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal); -} + msgInfo->code = 0; + } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { + SRetrieveFuncRsp retrieveRsp = {0}; + tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); -bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { - if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) { - pipe->inputTotal = *(int32_t *)(pipe->inputBuf); - } - if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) { - fnDebug("receive request complete. length %d", pipe->inputLen); - return true; - } - return false; -} + SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); + SUdf * udf = msgInfo->param; + udf->funcType = pFuncInfo->funcType; + udf->scriptType = pFuncInfo->scriptType; + udf->outputType = pFuncInfo->outputType; + udf->outputLen = pFuncInfo->outputLen; + udf->bufSize = pFuncInfo->bufSize; -void udfdHandleRequest(SUdfdUvConn *conn) { - uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t)); - SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); - udfWork->client = conn->client; - udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen); - conn->inputBuf = NULL; - conn->inputLen = 0; - conn->inputCap = 0; - conn->inputTotal = -1; - work->data = udfWork; - uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse); -} - -void udfdPipeCloseCb(uv_handle_t *pipe) { - SUdfdUvConn *conn = pipe->data; - taosMemoryFree(conn->client); - taosMemoryFree(conn->inputBuf); - taosMemoryFree(conn); -} - -void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn->client, udfdPipeCloseCb); } - -void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { - fnDebug("udf read %zd bytes from client", nread); - if (nread == 0) return; - - SUdfdUvConn *conn = client->data; - - if (nread > 0) { - conn->inputLen += nread; - if (isUdfdUvMsgComplete(conn)) { - udfdHandleRequest(conn); - } else { - // log error or continue; + char path[PATH_MAX] = {0}; + snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name); + TdFilePtr file = + taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); + if (count != pFuncInfo->codeSize) { + fnError("udfd write udf shared library failed"); + msgInfo->code = TSDB_CODE_FILE_CORRUPTED; } - return; + taosCloseFile(&file); + strncpy(udf->path, path, strlen(path)); + tFreeSFuncInfo(pFuncInfo); + taosArrayDestroy(retrieveRsp.pFuncInfos); + msgInfo->code = 0; } - if (nread < 0) { - fnDebug("Receive error %s", uv_err_name(nread)); - if (nread == UV_EOF) { - // TODO check more when close - } else { - } - udfdUvHandleError(conn); - } +_return: + rpcFreeCont(pMsg->pCont); + uv_sem_post(&msgInfo->resultSem); + return; } -void udfdOnNewConnection(uv_stream_t *server, int status) { - if (status < 0) { - fnError("udfd new connection error. code: %s", uv_strerror(status)); - return; - } +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { + SRetrieveFuncReq retrieveReq = {0}; + retrieveReq.numOfFuncs = 1; + retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); + taosArrayPush(retrieveReq.pFuncNames, udfName); - uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(global.loop, client, 0); - if (uv_accept(server, (uv_stream_t *)client) == 0) { - SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); - ctx->client = (uv_stream_t *)client; - ctx->inputBuf = 0; - ctx->inputLen = 0; - ctx->inputCap = 0; - client->data = ctx; - ctx->client = (uv_stream_t *)client; - uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); - } else { - uv_close((uv_handle_t *)client, NULL); - } + int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); + void * pReq = rpcMallocCont(contLen); + tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); + taosArrayDestroy(retrieveReq.pFuncNames); + + SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; + msgInfo->param = udf; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; + rpcMsg.info.ahandle = msgInfo; + rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + uv_sem_destroy(&msgInfo->resultSem); + int32_t code = msgInfo->code; + taosMemoryFree(msgInfo); + return code; } -void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { - fnInfo("udfd signal received: %d\n", signum); - uv_fs_t req; - uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); - uv_signal_stop(handle); - uv_stop(global.loop); +int32_t udfdConnectToMnode() { + SConnectReq connReq = {0}; + connReq.connType = CONN_TYPE__UDFD; + tstrncpy(connReq.app, "udfd", sizeof(connReq.app)); + tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user)); + char pass[TSDB_PASSWORD_LEN + 1] = {0}; + taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass); + tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd)); + connReq.pid = htonl(taosGetPId()); + connReq.startTime = htobe64(taosGetTimestampMs()); + + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq); + void * pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connReq); + + SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); + msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT; + uv_sem_init(&msgInfo->resultSem, 0); + + SRpcMsg rpcMsg = {0}; + rpcMsg.msgType = TDMT_MND_CONNECT; + rpcMsg.pCont = pReq; + rpcMsg.contLen = contLen; + rpcMsg.info.ahandle = msgInfo; + rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + + uv_sem_wait(&msgInfo->resultSem); + int32_t code = msgInfo->code; + uv_sem_destroy(&msgInfo->resultSem); + taosMemoryFree(msgInfo); + return code; } +int32_t udfdLoadUdf(char *udfName, SUdf *udf) { + strcpy(udf->name, udfName); + int32_t err = 0; + + err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf); + if (err != 0) { + fnError("can not retrieve udf from mnode. udf name %s", udfName); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + + err = uv_dlopen(udf->path, &udf->lib); + if (err != 0) { + fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + + char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *initSuffix = "_init"; + strcpy(initFuncName, udfName); + strncat(initFuncName, initSuffix, strlen(initSuffix)); + uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc)); + + char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *destroySuffix = "_destroy"; + strcpy(destroyFuncName, udfName); + strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); + uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc)); + + if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); + } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc)); + char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *startSuffix = "_start"; + strncpy(startFuncName, processFuncName, strlen(processFuncName)); + strncat(startFuncName, startSuffix, strlen(startSuffix)); + uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc)); + char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; + char *finishSuffix = "_finish"; + strncpy(finishFuncName, processFuncName, strlen(processFuncName)); + strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); + uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); + // TODO: merge + } + return 0; +} static bool udfdRpcRfp(int32_t code) { if (code == TSDB_CODE_RPC_REDIRECT) { return true; @@ -712,15 +608,144 @@ int32_t udfdCloseClientRpc() { return 0; } -static void udfdPrintVersion() { -#ifdef TD_ENTERPRISE - char *releaseName = "enterprise"; -#else - char *releaseName = "community"; -#endif - printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version); - printf("gitinfo: %s\n", gitinfo); - printf("buildInfo: %s\n", buildinfo); +void udfdOnWrite(uv_write_t *req, int status) { + SUvUdfWork *work = (SUvUdfWork *)req->data; + if (status < 0) { + fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); + } + taosMemoryFree(work->output.base); + taosMemoryFree(work); + taosMemoryFree(req); +} + +void udfdSendResponse(uv_work_t *work, int status) { + SUvUdfWork *udfWork = (SUvUdfWork *)(work->data); + + uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); + write_req->data = udfWork; + uv_write(write_req, udfWork->client, &udfWork->output, 1, udfdOnWrite); + + taosMemoryFree(work); +} + +void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { + SUdfdUvConn *ctx = handle->data; + int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); + if (ctx->inputCap == 0) { + ctx->inputBuf = taosMemoryMalloc(msgHeadSize); + if (ctx->inputBuf) { + ctx->inputLen = 0; + ctx->inputCap = msgHeadSize; + ctx->inputTotal = -1; + + buf->base = ctx->inputBuf; + buf->len = ctx->inputCap; + } else { + fnError("udfd can not allocate enough memory") + buf->base = NULL; + buf->len = 0; + } + } else { + ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap; + void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap); + if (inputBuf) { + ctx->inputBuf = inputBuf; + buf->base = ctx->inputBuf + ctx->inputLen; + buf->len = ctx->inputCap - ctx->inputLen; + } else { + fnError("udfd can not allocate enough memory") + buf->base = NULL; + buf->len = 0; + } + } + fnDebug("allocate buf. input buf cap - len - total : %d - %d - %d", ctx->inputCap, ctx->inputLen, ctx->inputTotal); +} + +bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { + if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) { + pipe->inputTotal = *(int32_t *)(pipe->inputBuf); + } + if (pipe->inputLen == pipe->inputCap && pipe->inputTotal == pipe->inputCap) { + fnDebug("receive request complete. length %d", pipe->inputLen); + return true; + } + return false; +} + +void udfdHandleRequest(SUdfdUvConn *conn) { + uv_work_t * work = taosMemoryMalloc(sizeof(uv_work_t)); + SUvUdfWork *udfWork = taosMemoryMalloc(sizeof(SUvUdfWork)); + udfWork->client = conn->client; + udfWork->input = uv_buf_init(conn->inputBuf, conn->inputLen); + conn->inputBuf = NULL; + conn->inputLen = 0; + conn->inputCap = 0; + conn->inputTotal = -1; + work->data = udfWork; + uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse); +} + +void udfdPipeCloseCb(uv_handle_t *pipe) { + SUdfdUvConn *conn = pipe->data; + taosMemoryFree(conn->client); + taosMemoryFree(conn->inputBuf); + taosMemoryFree(conn); +} + +void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { + fnDebug("udf read %zd bytes from client", nread); + if (nread == 0) return; + + SUdfdUvConn *conn = client->data; + + if (nread > 0) { + conn->inputLen += nread; + if (isUdfdUvMsgComplete(conn)) { + udfdHandleRequest(conn); + } else { + // log error or continue; + } + return; + } + + if (nread < 0) { + fnError("Receive error %s", uv_err_name(nread)); + if (nread == UV_EOF) { + // TODO check more when close + } else { + } + udfdUvHandleError(conn); + } +} + +void udfdOnNewConnection(uv_stream_t *server, int status) { + if (status < 0) { + fnError("udfd new connection error. code: %s", uv_strerror(status)); + return; + } + + uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t)); + uv_pipe_init(global.loop, client, 0); + if (uv_accept(server, (uv_stream_t *)client) == 0) { + SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); + ctx->client = (uv_stream_t *)client; + ctx->inputBuf = 0; + ctx->inputLen = 0; + ctx->inputCap = 0; + client->data = ctx; + ctx->client = (uv_stream_t *)client; + uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); + } else { + uv_close((uv_handle_t *)client, NULL); + } +} + +void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { + fnInfo("udfd signal received: %d\n", signum); + uv_fs_t req; + uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + uv_signal_stop(handle); + uv_stop(global.loop); } static int32_t udfdParseArgs(int32_t argc, char *argv[]) { @@ -745,6 +770,17 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) { return 0; } +static void udfdPrintVersion() { +#ifdef TD_ENTERPRISE + char *releaseName = "enterprise"; +#else + char *releaseName = "community"; +#endif + printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version); + printf("gitinfo: %s\n", gitinfo); + printf("buildInfo: %s\n", buildinfo); +} + static int32_t udfdInitLog() { char logName[12] = {0}; snprintf(logName, sizeof(logName), "%slog", "udfd"); @@ -868,8 +904,8 @@ int main(int argc, char *argv[]) { int32_t retryMnodeTimes = 0; int32_t code = 0; - while (retryMnodeTimes++ < TSDB_MAX_REPLICA) { - uv_sleep(500 * (1 << retryMnodeTimes)); + while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) { + uv_sleep(100 * (1 << retryMnodeTimes)); code = udfdConnectToMnode(); if (code == 0) { break; @@ -890,6 +926,7 @@ int main(int argc, char *argv[]) { udfdRun(); removeListeningPipe(); - udfdCloseClientRpc(); + + return 0; }