return value: udf

This commit is contained in:
xsren 2024-07-27 18:33:08 +08:00
parent a3b30192ae
commit 911e6380b9
14 changed files with 895 additions and 360 deletions

View File

@ -33,6 +33,15 @@ extern "C" {
#else #else
#define FORCE_INLINE #define FORCE_INLINE
#endif #endif
#define TAOS_UDF_CHECK_RETURN(CMD) \
do { \
int32_t code = (CMD); \
if (code != TSDB_CODE_SUCCESS) { \
return (CMD); \
} \
} while (0)
typedef struct SUdfColumnMeta { typedef struct SUdfColumnMeta {
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
@ -192,25 +201,28 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void udfColDataSetNull(SUdfColumn *pColumn, int32_t row) { static FORCE_INLINE int32_t udfColDataSetNull(SUdfColumn *pColumn, int32_t row) {
udfColEnsureCapacity(pColumn, row + 1); int32_t code = udfColEnsureCapacity(pColumn, row + 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) { if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) {
udfColDataSetNull_var(pColumn, row); udfColDataSetNull_var(pColumn, row);
} else { } else {
udfColDataSetNull_f(pColumn, row); udfColDataSetNull_f(pColumn, row);
} }
pColumn->hasNull = true; pColumn->hasNull = true;
pColumn->colData.numOfRows = pColumn->colData.numOfRows = ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows;
((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; return 0;
} }
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) { static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) {
SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnMeta *meta = &pColumn->colMeta;
SUdfColumnData *data = &pColumn->colData; SUdfColumnData *data = &pColumn->colData;
udfColEnsureCapacity(pColumn, currentRow + 1); TAOS_UDF_CHECK_RETURN(udfColEnsureCapacity(pColumn, currentRow + 1));
bool isVarCol = IS_VAR_DATA_TYPE(meta->type); bool isVarCol = IS_VAR_DATA_TYPE(meta->type);
if (isNull) { if (isNull) {
udfColDataSetNull(pColumn, currentRow); TAOS_UDF_CHECK_RETURN(udfColDataSetNull(pColumn, currentRow));
} else { } else {
if (!isVarCol) { if (!isVarCol) {
udfColDataSetNotNull_f(pColumn, currentRow); udfColDataSetNotNull_f(pColumn, currentRow);

View File

@ -43,6 +43,25 @@ extern "C" {
#endif #endif
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID" #define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
#define TAOS_UV_LIB_ERROR_RET(ret) \
do { \
if (0 != ret) { \
terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE; \
return TSDB_CODE_UDF_UV_EXEC_FAILURE; \
} \
} while(0)
#define TAOS_UV_CHECK_ERRNO(CODE) \
do { \
if (0 != CODE) { \
terrln = __LINE__; \
terrno = (CODE); \
goto _exit; \
} \
} while (0)
// low level APIs // low level APIs
/** /**
* setup udf * setup udf
@ -109,13 +128,13 @@ int32_t udfStartUdfd(int32_t startDnodeId);
* stop udfd * stop udfd
* @return * @return
*/ */
int32_t udfStopUdfd(); void udfStopUdfd();
/** /**
* get udfd pid * get udfd pid
* *
*/ */
int32_t udfGetUdfdPid(int32_t* pUdfdPid); // int32_t udfGetUdfdPid(int32_t* pUdfdPid);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -79,6 +79,8 @@ extern "C" {
typedef struct TdDir *TdDirPtr; typedef struct TdDir *TdDirPtr;
typedef struct TdDirEntry *TdDirEntryPtr; typedef struct TdDirEntry *TdDirEntryPtr;
#define TAOS_DIRNAME(name) ((void)taosDirName(name))
void taosRemoveDir(const char *dirname); void taosRemoveDir(const char *dirname);
bool taosDirExist(const char *dirname); bool taosDirExist(const char *dirname);
int32_t taosMkDir(const char *dirname); int32_t taosMkDir(const char *dirname);

View File

@ -39,7 +39,7 @@ int64_t taosGetOsUptime();
int32_t taosGetEmail(char *email, int32_t maxLen); int32_t taosGetEmail(char *email, int32_t maxLen);
int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t maxLen); int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t maxLen);
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
int32_t taosGetCpuCores(float *numOfCores, bool physical); void taosGetCpuCores(float *numOfCores, bool physical);
void taosGetCpuUsage(double *cpu_system, double *cpu_engine); void taosGetCpuUsage(double *cpu_system, double *cpu_engine);
int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512); int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512);
int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetTotalMemory(int64_t *totalKB);

View File

@ -877,6 +877,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x2908) #define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x2908)
#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2909) #define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2909)
#define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290A) #define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290A)
#define TSDB_CODE_UDF_UV_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290B)
// sml // sml
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)

View File

@ -16,12 +16,10 @@
#ifndef _TD_UTIL_UTIL_H_ #ifndef _TD_UTIL_UTIL_H_
#define _TD_UTIL_UTIL_H_ #define _TD_UTIL_UTIL_H_
#include "os.h"
#include "tcrc32c.h" #include "tcrc32c.h"
#include "tdef.h" #include "tdef.h"
#include "thash.h" #include "thash.h"
#include "tmd5.h" #include "tmd5.h"
#include "tutil.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {

View File

@ -109,7 +109,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block);
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock);
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t getUdfdPipeName(char *pipeName, int32_t size); void getUdfdPipeName(char *pipeName, int32_t size);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -24,6 +24,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tudf.h" #include "tudf.h"
#include <stdint.h>
#include "tudfInt.h" #include "tudfInt.h"
#ifdef _TD_DARWIN_64 #ifdef _TD_DARWIN_64
@ -51,11 +52,10 @@ typedef struct SUdfdData {
SUdfdData udfdGlobal = {0}; SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId); int32_t udfStartUdfd(int32_t startDnodeId);
int32_t udfStopUdfd(); void udfStopUdfd();
static int32_t udfSpawnUdfd(SUdfdData *pData); static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal); void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal);
static int32_t udfSpawnUdfd(SUdfdData *pData);
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg); static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
static void udfUdfdStopAsyncCb(uv_async_t *async); static void udfUdfdStopAsyncCb(uv_async_t *async);
static void udfWatchUdfd(void *args); static void udfWatchUdfd(void *args);
@ -67,7 +67,10 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
} else { } else {
fnInfo("udfd process restart"); fnInfo("udfd process restart");
udfSpawnUdfd(pData); int32_t code = udfSpawnUdfd(pData);
if(code != 0) {
fnError("udfd process restart failed with code:%d", code);
}
} }
} }
@ -80,26 +83,26 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
path[0] = '.'; path[0] = '.';
#ifdef WINDOWS #ifdef WINDOWS
GetModuleFileName(NULL, path, PATH_MAX); GetModuleFileName(NULL, path, PATH_MAX);
taosDirName(path); TAOS_DIRNAME(path);
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
uint32_t pathSize = sizeof(path); uint32_t pathSize = sizeof(path);
_NSGetExecutablePath(path, &pathSize); _NSGetExecutablePath(path, &pathSize);
taosDirName(path); TAOS_DIRNAME(path);
#endif #endif
} else { } else {
strncpy(path, tsProcPath, PATH_MAX); TAOS_STRNCPY(path, tsProcPath, PATH_MAX);
taosDirName(path); TAOS_DIRNAME(path);
} }
#ifdef WINDOWS #ifdef WINDOWS
if (strlen(path) == 0) { if (strlen(path) == 0) {
strcat(path, "C:\\TDengine"); TAOS_STRCAT(path, "C:\\TDengine");
} }
strcat(path, "\\udfd.exe"); TAOS_STRCAT(path, "\\udfd.exe");
#else #else
if (strlen(path) == 0) { if (strlen(path) == 0) {
strcat(path, "/usr/bin"); TAOS_STRCAT(path, "/usr/bin");
} }
strcat(path, "/udfd"); TAOS_STRCAT(path, "/udfd");
#endif #endif
char *argsUdfd[] = {path, "-c", configDir, NULL}; char *argsUdfd[] = {path, "-c", configDir, NULL};
options.args = argsUdfd; options.args = argsUdfd;
@ -107,7 +110,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
options.exit_cb = udfUdfdExit; options.exit_cb = udfUdfdExit;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1));
uv_stdio_container_t child_stdio[3]; uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
@ -156,7 +159,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
taosFqdnEnvItem = taosMemoryMalloc(strlen("TAOS_FQDN=") + strlen(taosFqdn) + 1); taosFqdnEnvItem = taosMemoryMalloc(strlen("TAOS_FQDN=") + strlen(taosFqdn) + 1);
if (taosFqdnEnvItem != NULL) { if (taosFqdnEnvItem != NULL) {
strcpy(taosFqdnEnvItem, "TAOS_FQDN="); strcpy(taosFqdnEnvItem, "TAOS_FQDN=");
strcat(taosFqdnEnvItem, taosFqdn); TAOS_STRCAT(taosFqdnEnvItem, taosFqdn);
fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn); fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn);
} else { } else {
fnError("[UDFD]Failed to allocate memory for TAOS_FQDN"); fnError("[UDFD]Failed to allocate memory for TAOS_FQDN");
@ -212,22 +215,37 @@ static void udfUdfdStopAsyncCb(uv_async_t *async) {
static void udfWatchUdfd(void *args) { static void udfWatchUdfd(void *args) {
SUdfdData *pData = args; SUdfdData *pData = args;
uv_loop_init(&pData->loop); TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb); TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
pData->stopAsync.data = pData; pData->stopAsync.data = pData;
int32_t err = udfSpawnUdfd(pData); TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
atomic_store_32(&pData->spawnErr, err); atomic_store_32(&pData->spawnErr, 0);
uv_barrier_wait(&pData->barrier); (void)uv_barrier_wait(&pData->barrier);
uv_run(&pData->loop, UV_RUN_DEFAULT); int num = uv_run(&pData->loop, UV_RUN_DEFAULT);
uv_loop_close(&pData->loop); fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL); uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
uv_run(&pData->loop, UV_RUN_DEFAULT); num = uv_run(&pData->loop, UV_RUN_DEFAULT);
uv_loop_close(&pData->loop); fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
if(uv_loop_close(&pData->loop) != 0) {
fnError("udfd loop close failed, lino:%d", __LINE__);
}
_exit:
if (terrno != 0) {
(void)uv_barrier_wait(&pData->barrier);
atomic_store_32(&pData->spawnErr, terrno);
if(uv_loop_close(&pData->loop) != 0) {
fnError("udfd loop close failed, lino:%d", __LINE__);
}
fnError("udfd thread exit with code:%d lino:%d", terrno, terrln);
terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
return; return;
} }
int32_t udfStartUdfd(int32_t startDnodeId) { int32_t udfStartUdfd(int32_t startDnodeId) {
int32_t code = 0, lino = 0;
if (!tsStartUdfd) { if (!tsStartUdfd) {
fnInfo("start udfd is disabled.") return 0; fnInfo("start udfd is disabled.") return 0;
} }
@ -239,43 +257,58 @@ int32_t udfStartUdfd(int32_t startDnodeId) {
pData->startCalled = true; pData->startCalled = true;
char dnodeId[8] = {0}; char dnodeId[8] = {0};
snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId); snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
uv_os_setenv("DNODE_ID", dnodeId); TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit);
pData->dnodeId = startDnodeId; pData->dnodeId = startDnodeId;
uv_barrier_init(&pData->barrier, 2); TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit);
uv_thread_create(&pData->thread, udfWatchUdfd, pData); TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit);
uv_barrier_wait(&pData->barrier); (void)uv_barrier_wait(&pData->barrier);
int32_t err = atomic_load_32(&pData->spawnErr); int32_t err = atomic_load_32(&pData->spawnErr);
if (err != 0) { if (err != 0) {
uv_barrier_destroy(&pData->barrier); uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync); if(uv_async_send(&pData->stopAsync) != 0) {
uv_thread_join(&pData->thread); fnError("start udfd: failed to send stop async");
}
if(uv_thread_join(&pData->thread)!= 0) {
fnError("start udfd: failed to join udfd thread");
}
pData->needCleanUp = false; pData->needCleanUp = false;
fnInfo("udfd is cleaned up after spawn err"); fnInfo("udfd is cleaned up after spawn err");
TAOS_CHECK_GOTO(err, &lino, _exit);
} else { } else {
pData->needCleanUp = true; pData->needCleanUp = true;
} }
return err; _exit:
if (code != 0) {
fnError("udfd start failed with code:%d, lino:%d", code, lino);
}
return code;
} }
int32_t udfStopUdfd() { void udfStopUdfd() {
SUdfdData *pData = &udfdGlobal; SUdfdData *pData = &udfdGlobal;
fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr); fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr);
if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
return 0; return;
} }
atomic_store_32(&pData->stopCalled, 1); atomic_store_32(&pData->stopCalled, 1);
pData->needCleanUp = false; pData->needCleanUp = false;
uv_barrier_destroy(&pData->barrier); uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync); if(uv_async_send(&pData->stopAsync) != 0) {
uv_thread_join(&pData->thread); fnError("stop udfd: failed to send stop async");
}
if(uv_thread_join(&pData->thread) != 0) {
fnError("stop udfd: failed to join udfd thread");
}
#ifdef WINDOWS #ifdef WINDOWS
if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle); if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle);
#endif #endif
fnInfo("udfd is cleaned up"); fnInfo("udfd is cleaned up");
return 0; return;
} }
/*
int32_t udfGetUdfdPid(int32_t* pUdfdPid) { int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
SUdfdData *pData = &udfdGlobal; SUdfdData *pData = &udfdGlobal;
if (pData->spawnErr) { if (pData->spawnErr) {
@ -287,6 +320,7 @@ int32_t udfGetUdfdPid(int32_t* pUdfdPid) {
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
*/
//============================================================================================== //==============================================================================================
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl> /* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
@ -439,8 +473,6 @@ typedef struct SClientUdfTask {
SUdfcUvSession *session; SUdfcUvSession *session;
int32_t errCode;
union { union {
struct { struct {
SUdfSetupRequest req; SUdfSetupRequest req;
@ -479,7 +511,7 @@ enum {
UDFC_STATE_STOPPING, // stopping after udfcClose UDFC_STATE_STOPPING, // stopping after udfcClose
}; };
int32_t getUdfdPipeName(char *pipeName, int32_t size); void getUdfdPipeName(char *pipeName, int32_t size);
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup); int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup);
void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request); void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request);
int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state); int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state);
@ -507,7 +539,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block);
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output); int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output);
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output); int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output);
int32_t getUdfdPipeName(char *pipeName, int32_t size) { void getUdfdPipeName(char *pipeName, int32_t size) {
char dnodeId[8] = {0}; char dnodeId[8] = {0};
size_t dnodeIdSize = sizeof(dnodeId); size_t dnodeIdSize = sizeof(dnodeId);
int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
@ -522,7 +554,6 @@ int32_t getUdfdPipeName(char *pipeName, int32_t size) {
snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
#endif #endif
fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName); fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName);
return 0;
} }
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
@ -712,16 +743,14 @@ void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownR
int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) { int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) {
int32_t len = 0; int32_t len = 0;
if (buf == NULL) { len += sizeof(rsp->msgLen);
len += sizeof(rsp->msgLen); if (buf != NULL) {
} else {
*(int32_t *)(*buf) = rsp->msgLen; *(int32_t *)(*buf) = rsp->msgLen;
*buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen)); *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
} }
if (buf == NULL) { len += sizeof(rsp->seqNum);
len += sizeof(rsp->seqNum); if (buf != NULL) {
} else {
*(int64_t *)(*buf) = rsp->seqNum; *(int64_t *)(*buf) = rsp->seqNum;
*buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum)); *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
} }
@ -810,6 +839,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *)); udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn)); udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
if(udfBlock->udfCols[i] == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i); SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i);
SUdfColumn *udfCol = udfBlock->udfCols[i]; SUdfColumn *udfCol = udfBlock->udfCols[i];
udfCol->colMeta.type = col->info.type; udfCol->colMeta.type = col->info.type;
@ -821,9 +853,15 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) {
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
if(udfCol->colData.varLenCol.varOffsets == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows); udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen); udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
if(udfCol->colData.varLenCol.payload == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (col->reassigned) { if (col->reassigned) {
for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) { for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
char* pColData = col->pData + col->varmeta.offset[row]; char* pColData = col->pData + col->varmeta.offset[row];
@ -843,6 +881,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen; int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen); udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
if(udfCol->colData.fixLenCol.nullBitmap == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
char *bitmap = udfCol->colData.fixLenCol.nullBitmap; char *bitmap = udfCol->colData.fixLenCol.nullBitmap;
memcpy(bitmap, col->nullbitmap, bitmapLen); memcpy(bitmap, col->nullbitmap, bitmapLen);
udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows); udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
@ -852,15 +893,18 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
memcpy(data, col->pData, dataLen); memcpy(data, col->pData, dataLen);
} }
} }
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
int32_t code = 0, lino = 0;
SUdfColumnMeta* meta = &udfCol->colMeta; SUdfColumnMeta* meta = &udfCol->colMeta;
SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1); SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
blockDataAppendColInfo(block, &colInfoData); code = blockDataAppendColInfo(block, &colInfoData);
blockDataEnsureCapacity(block, udfCol->colData.numOfRows); TAOS_CHECK_GOTO(code, &lino, _exit);
code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
TAOS_CHECK_GOTO(code, &lino, _exit);
SColumnInfoData *col = bdGetColumnInfoData(block, 0); SColumnInfoData *col = bdGetColumnInfoData(block, 0);
for (int i = 0; i < udfCol->colData.numOfRows; ++i) { for (int i = 0; i < udfCol->colData.numOfRows; ++i) {
@ -868,43 +912,20 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
colDataSetNULL(col, i); colDataSetNULL(col, i);
} else { } else {
char* data = udfColDataGetData(udfCol, i); char* data = udfColDataGetData(udfCol, i);
colDataSetVal(col, i, data, false); code = colDataSetVal(col, i, data, false);
TAOS_CHECK_GOTO(code, &lino, _exit);
} }
} }
block->info.rows = udfCol->colData.numOfRows; block->info.rows = udfCol->colData.numOfRows;
return 0; _exit:
} if (code != 0) {
fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino);
int32_t convertUdfColumnToDataBlock2(SUdfColumn *udfCol, SSDataBlock *block) {
block->info.rows = udfCol->colData.numOfRows;
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
taosArrayPush(block->pDataBlock, &(SColumnInfoData){0});
SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0);
SUdfColumnMeta *meta = &udfCol->colMeta;
col->info.precision = meta->precision;
col->info.bytes = meta->bytes;
col->info.scale = meta->scale;
col->info.type = meta->type;
col->hasNull = udfCol->hasNull;
SUdfColumnData *data = &udfCol->colData;
if (!IS_VAR_DATA_TYPE(meta->type)) {
col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
} else {
col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
} }
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) { int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
int32_t code = 0, lino = 0;
int32_t numOfRows = 0; int32_t numOfRows = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows; numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows;
@ -916,16 +937,16 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
SColumnInfoData d = {0}; SColumnInfoData d = {0};
d.info = pInfo->info; d.info = pInfo->info;
blockDataAppendColInfo(output, &d); TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit);
} }
blockDataEnsureCapacity(output, numOfRows); TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit);
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i); SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i);
SColumnInfoData* pColInfoData = input[i].columnData; SColumnInfoData* pColInfoData = input[i].columnData;
colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info); TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit);
if (input[i].numOfRows < numOfRows) { if (input[i].numOfRows < numOfRows) {
int32_t startRow = input[i].numOfRows; int32_t startRow = input[i].numOfRows;
@ -936,26 +957,31 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
} else { } else {
char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1); char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
for (int j = 0; j < expandRows; ++j) { for (int j = 0; j < expandRows; ++j) {
colDataSetVal(pDest, startRow+j, src, false); TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow+j, src, false), &lino, _exit);
} }
//colDataSetNItems(pColInfoData, startRow, data, expandRows);
} }
} }
} }
output->info.rows = numOfRows; output->info.rows = numOfRows;
_exit:
return 0; if (code != 0) {
fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino);
}
return code;
} }
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
if (taosArrayGetSize(input->pDataBlock) != 1) { if (taosArrayGetSize(input->pDataBlock) != 1) {
fnError("scalar function only support one column"); fnError("scalar function only support one column");
return -1; return 0;
} }
output->numOfRows = input->info.rows; output->numOfRows = input->info.rows;
output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData)); output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData));
if(output->columnData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData)); memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData));
output->colAlloced = true; output->colAlloced = true;
@ -1024,7 +1050,7 @@ int compareUdfcFuncSub(const void *elem1, const void *elem2) {
} }
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
int32_t code = 0; int32_t code = 0, line = 0;
uv_mutex_lock(&gUdfcProxy.udfStubsMutex); uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
SUdfcFuncStub key = {0}; SUdfcFuncStub key = {0};
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
@ -1048,7 +1074,10 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
} else { } else {
fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName); fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName);
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); if(taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) {
fnError("acquireUdfFuncHandle: failed to push udf stub to array");
goto _exit;
}
taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
} }
} }
@ -1060,12 +1089,16 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
stub.handle = *pHandle; stub.handle = *pHandle;
++stub.refCount; ++stub.refCount;
stub.createTime = taosGetTimestampUs(); stub.createTime = taosGetTimestampUs();
taosArrayPush(gUdfcProxy.udfStubs, &stub); if(taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) {
fnError("acquireUdfFuncHandle: failed to push udf stub to array");
goto _exit;
}
taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub); taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
} else { } else {
*pHandle = NULL; *pHandle = NULL;
} }
_exit:
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return code; return code;
} }
@ -1092,17 +1125,23 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
void cleanupExpiredUdfs() { void cleanupExpiredUdfs() {
int32_t i = 0; int32_t i = 0;
SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
if(expiredUdfStubs == NULL) {
fnError("cleanupExpiredUdfs: failed to init array");
return;
}
while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
if (stub->refCount == 0) { if (stub->refCount == 0) {
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle); (void)doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle); stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle; UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(expiredUdfStubs, stub); if(taosArrayPush(expiredUdfStubs, stub) == NULL) {
fnError("cleanupExpiredUdfs: failed to push udf stub to array");
}
} else { } else {
fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->createTime); stub->udfName, stub->refCount, stub->createTime);
@ -1121,16 +1160,18 @@ void cleanupNotExpiredUdfs() {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
if (stub->refCount == 0) { if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle); (void)doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle); stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle; UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub); if (taosArrayPush(udfStubs, stub) == NULL) {
fnError("cleanupNotExpiredUdfs: failed to push udf stub to array");
}
} else { } else {
fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName,
stub->udfName, stub->refCount, stub->createTime); stub->refCount, stub->createTime);
} }
} }
++i; ++i;
@ -1259,7 +1300,11 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.rows = pInput->totalRows;
pTempBlock->info.id.uid = pInput->uid; pTempBlock->info.id.uid = pInput->uid;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
blockDataAppendColInfo(pTempBlock, pInput->pData[i]); if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) {
fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode);
blockDataDestroy(pTempBlock);
return udfCode;
}
} }
SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows);
@ -1358,12 +1403,16 @@ void onUdfcPipeClose(uv_handle_t *handle) {
} }
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) { int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) {
int32_t code = 0;
fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask); fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask);
if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->type == UV_TASK_REQ_RSP) {
if (uvTask->rspBuf.base != NULL) { if (uvTask->rspBuf.base != NULL) {
SUdfResponse rsp = {0}; SUdfResponse rsp = {0};
void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp);
task->errCode = rsp.code; code = rsp.code;
if(code != 0) {
fnError("udfc get udf task result failure. code: %d", code);
}
switch (task->type) { switch (task->type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP: {
@ -1386,14 +1435,23 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
// TODO: the call buffer is setup and freed by udf invocation // TODO: the call buffer is setup and freed by udf invocation
taosMemoryFree(uvTask->rspBuf.base); taosMemoryFree(uvTask->rspBuf.base);
} else { } else {
task->errCode = uvTask->errCode; code = uvTask->errCode;
if(code != 0) {
fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
}
} }
} else if (uvTask->type == UV_TASK_CONNECT) { } else if (uvTask->type == UV_TASK_CONNECT) {
task->errCode = uvTask->errCode; code = uvTask->errCode;
if(code != 0) {
fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
}
} else if (uvTask->type == UV_TASK_DISCONNECT) { } else if (uvTask->type == UV_TASK_DISCONNECT) {
task->errCode = uvTask->errCode; code = uvTask->errCode;
if(code != 0) {
fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__);
}
} }
return 0; return code;
} }
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
@ -1542,7 +1600,11 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) {
} }
uvTask->errCode = status; uvTask->errCode = status;
uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead); int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
if(code != 0) {
fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code));
uvTask->errCode = code;
}
taosMemoryFree(connect); taosMemoryFree(connect);
QUEUE_REMOVE(&uvTask->procTaskQueue); QUEUE_REMOVE(&uvTask->procTaskQueue);
uv_sem_post(&uvTask->taskSem); uv_sem_post(&uvTask->taskSem);
@ -1572,16 +1634,37 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT
fnError("udfc create uv task, invalid task type : %d", task->type); fnError("udfc create uv task, invalid task type : %d", task->type);
} }
int32_t bufLen = encodeUdfRequest(NULL, &request); int32_t bufLen = encodeUdfRequest(NULL, &request);
if (bufLen <= 0) {
fnError("udfc create uv task, encode request failed. size: %d", bufLen);
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
request.msgLen = bufLen; request.msgLen = bufLen;
void *bufBegin = taosMemoryMalloc(bufLen); void *bufBegin = taosMemoryMalloc(bufLen);
if(bufBegin == NULL) {
fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen);
return TSDB_CODE_OUT_OF_MEMORY;
}
void *buf = bufBegin; void *buf = bufBegin;
encodeUdfRequest(&buf, &request); if(encodeUdfRequest(&buf, &request) <= 0)
{
fnError("udfc create uv task, encode request failed. size: %d", bufLen);
taosMemoryFree(bufBegin);
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
uvTask->reqBuf = uv_buf_init(bufBegin, bufLen); uvTask->reqBuf = uv_buf_init(bufBegin, bufLen);
uvTask->seqNum = request.seqNum; uvTask->seqNum = request.seqNum;
} else if (uvTaskType == UV_TASK_DISCONNECT) { } else if (uvTaskType == UV_TASK_DISCONNECT) {
uvTask->pipe = task->session->udfUvPipe; uvTask->pipe = task->session->udfUvPipe;
} }
uv_sem_init(&uvTask->taskSem, 0); if (uv_sem_init(&uvTask->taskSem, 0) != 0)
{
if (uvTaskType == UV_TASK_REQ_RSP) {
taosMemoryFree(uvTask->reqBuf.base);
}
fnError("udfc create uv task, init semaphore failed.");
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
return 0; return 0;
} }
@ -1592,7 +1675,11 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
uv_mutex_lock(&udfc->taskQueueMutex); uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue); QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&udfc->taskQueueMutex); uv_mutex_unlock(&udfc->taskQueueMutex);
uv_async_send(&udfc->loopTaskAync); int32_t code = uv_async_send(&udfc->loopTaskAync);
if (code != 0) {
fnError("udfc queue uv task to event loop failed. code: %s", uv_strerror(code));
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
uv_sem_wait(&uvTask->taskSem); uv_sem_wait(&uvTask->taskSem);
fnInfo("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask); fnInfo("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
@ -1608,10 +1695,23 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0); if(pipe == NULL) {
fnError("udfc event loop start connect task malloc pipe failed.");
return TSDB_CODE_OUT_OF_MEMORY;
}
if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) {
fnError("udfc event loop start connect task uv_pipe_init failed.");
taosMemoryFree(pipe);
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
uvTask->pipe = pipe; uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn)); SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn));
if(conn == NULL) {
fnError("udfc event loop start connect task malloc conn failed.");
taosMemoryFree(pipe);
return TSDB_CODE_OUT_OF_MEMORY;
}
conn->pipe = pipe; conn->pipe = pipe;
conn->readBuf.len = 0; conn->readBuf.len = 0;
conn->readBuf.cap = 0; conn->readBuf.cap = 0;
@ -1622,6 +1722,12 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
pipe->data = conn; pipe->data = conn;
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
if(connReq == NULL) {
fnError("udfc event loop start connect task malloc connReq failed.");
taosMemoryFree(pipe);
taosMemoryFree(conn);
return TSDB_CODE_OUT_OF_MEMORY;
}
connReq->data = uvTask; connReq->data = uvTask;
uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect); uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
code = 0; code = 0;
@ -1633,6 +1739,10 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
code = TSDB_CODE_UDF_PIPE_NOT_EXIST; code = TSDB_CODE_UDF_PIPE_NOT_EXIST;
} else { } else {
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
if(write == NULL) {
fnError("udfc event loop start req_rsp task malloc write failed.");
return TSDB_CODE_OUT_OF_MEMORY;
}
write->data = pipe->data; write->data = pipe->data;
QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue; QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue); QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
@ -1726,27 +1836,41 @@ void udfStopAsyncCb(uv_async_t *async) {
} }
void constructUdfService(void *argsThread) { void constructUdfService(void *argsThread) {
int32_t code = 0, lino = 0;
SUdfcProxy *udfc = (SUdfcProxy *)argsThread; SUdfcProxy *udfc = (SUdfcProxy *)argsThread;
uv_loop_init(&udfc->uvLoop); code = uv_loop_init(&udfc->uvLoop);
TAOS_CHECK_GOTO(code, &lino, _exit);
uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb); code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb);
TAOS_CHECK_GOTO(code, &lino, _exit);
udfc->loopTaskAync.data = udfc; udfc->loopTaskAync.data = udfc;
uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb);
TAOS_CHECK_GOTO(code, &lino, _exit);
udfc->loopStopAsync.data = udfc; udfc->loopStopAsync.data = udfc;
uv_mutex_init(&udfc->taskQueueMutex); code = uv_mutex_init(&udfc->taskQueueMutex);
TAOS_CHECK_GOTO(code, &lino, _exit);
QUEUE_INIT(&udfc->taskQueue); QUEUE_INIT(&udfc->taskQueue);
QUEUE_INIT(&udfc->uvProcTaskQueue); QUEUE_INIT(&udfc->uvProcTaskQueue);
uv_barrier_wait(&udfc->initBarrier); (void)uv_barrier_wait(&udfc->initBarrier);
// TODO return value of uv_run // TODO return value of uv_run
uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); int num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
uv_loop_close(&udfc->uvLoop); fnInfo("udfc uv loop exit. active handle num: %d", num);
(void)uv_loop_close(&udfc->uvLoop);
uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL); uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL);
uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
uv_loop_close(&udfc->uvLoop); fnInfo("udfc uv loop exit. active handle num: %d", num);
(void)uv_loop_close(&udfc->uvLoop);
_exit:
if (code != 0) {
fnError("udfc construct error. code: %d, line: %d", code, lino);
}
fnInfo("udfc construct finished");
} }
int32_t udfcOpen() { int32_t udfcOpen() {
int32_t code = 0, lino = 0;
int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1); int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
if (old == 1) { if (old == 1) {
return 0; return 0;
@ -1754,16 +1878,36 @@ int32_t udfcOpen() {
SUdfcProxy *proxy = &gUdfcProxy; SUdfcProxy *proxy = &gUdfcProxy;
getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName)); getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
proxy->udfcState = UDFC_STATE_STARTNG; proxy->udfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->initBarrier, 2); code = uv_barrier_init(&proxy->initBarrier, 2);
uv_thread_create(&proxy->loopThread, constructUdfService, proxy); TAOS_CHECK_GOTO(code, &lino, _exit);
code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy);
TAOS_CHECK_GOTO(code, &lino, _exit);
atomic_store_8(&proxy->udfcState, UDFC_STATE_READY); atomic_store_8(&proxy->udfcState, UDFC_STATE_READY);
proxy->udfcState = UDFC_STATE_READY; proxy->udfcState = UDFC_STATE_READY;
uv_barrier_wait(&proxy->initBarrier); (void)uv_barrier_wait(&proxy->initBarrier);
uv_mutex_init(&proxy->udfStubsMutex); TAOS_CHECK_GOTO(code, &lino, _exit);
code = uv_mutex_init(&proxy->udfStubsMutex);
TAOS_CHECK_GOTO(code, &lino, _exit);
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
if(proxy->udfStubs == NULL) {
fnError("udfc init failed. udfStubs: %p", proxy->udfStubs);
return -1;
}
proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
uv_mutex_init(&proxy->udfcUvMutex); if(proxy->expiredUdfStubs == NULL) {
fnInfo("udfc initialized") return 0; taosArrayDestroy(proxy->udfStubs);
fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs);
return -1;
}
code = uv_mutex_init(&proxy->udfcUvMutex);
TAOS_CHECK_GOTO(code, &lino, _exit);
_exit:
if (code != 0) {
fnError("udfc open error. code: %d, line: %d", code, lino);
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
fnInfo("udfc initialized");
return 0;
} }
int32_t udfcClose() { int32_t udfcClose() {
@ -1774,8 +1918,12 @@ int32_t udfcClose() {
SUdfcProxy *udfc = &gUdfcProxy; SUdfcProxy *udfc = &gUdfcProxy;
udfc->udfcState = UDFC_STATE_STOPPING; udfc->udfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->loopStopAsync); if(uv_async_send(&udfc->loopStopAsync) != 0) {
uv_thread_join(&udfc->loopThread); fnError("udfc close error to send stop async");
}
if(uv_thread_join(&udfc->loopThread) != 0 ) {
fnError("udfc close errir to join loop thread");
}
uv_mutex_destroy(&udfc->taskQueueMutex); uv_mutex_destroy(&udfc->taskQueueMutex);
uv_barrier_destroy(&udfc->initBarrier); uv_barrier_destroy(&udfc->initBarrier);
taosArrayDestroy(udfc->expiredUdfStubs); taosArrayDestroy(udfc->expiredUdfStubs);
@ -1788,45 +1936,61 @@ int32_t udfcClose() {
} }
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
int32_t code = 0, lino = 0;
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
if(uvTask == NULL) {
fnError("udfc client task: %p failed to allocate memory for uvTask", task);
return TSDB_CODE_OUT_OF_MEMORY;
}
fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe); fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
udfcInitializeUvTask(task, uvTaskType, uvTask); code = udfcInitializeUvTask(task, uvTaskType, uvTask);
udfcQueueUvTask(uvTask); TAOS_CHECK_GOTO(code, &lino, _exit);
udfcGetUdfTaskResultFromUvTask(task, uvTask); code = udfcQueueUvTask(uvTask);
TAOS_CHECK_GOTO(code, &lino, _exit);
code = udfcGetUdfTaskResultFromUvTask(task, uvTask);
TAOS_CHECK_GOTO(code, &lino, _exit);
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
task->session->udfUvPipe = uvTask->pipe; task->session->udfUvPipe = uvTask->pipe;
SClientUvConn *conn = uvTask->pipe->data; SClientUvConn *conn = uvTask->pipe->data;
conn->session = task->session; conn->session = task->session;
} }
_exit:
if (code != 0) {
fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino);
}
taosMemoryFree(uvTask->reqBuf.base); taosMemoryFree(uvTask->reqBuf.base);
uvTask->reqBuf.base = NULL; uvTask->reqBuf.base = NULL;
taosMemoryFree(uvTask); taosMemoryFree(uvTask);
fnDebug("udfc freed uvTask: %p", task);
uvTask = NULL; uvTask = NULL;
return task->errCode; return code;
} }
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t code = TSDB_CODE_SUCCESS, lino = 0;
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0; if(task == NULL) {
fnError("doSetupUdf, failed to allocate memory for task");
return TSDB_CODE_OUT_OF_MEMORY;
}
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
if(task->session == NULL) {
fnError("doSetupUdf, failed to allocate memory for session");
taosMemoryFree(task);
return TSDB_CODE_OUT_OF_MEMORY;
}
task->session->udfc = &gUdfcProxy; task->session->udfc = &gUdfcProxy;
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); code = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) { TAOS_CHECK_GOTO(code, &lino, _exit);
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfcProxy)->udfdPipeName);
taosMemoryFree(task->session);
taosMemoryFree(task);
return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
}
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
TAOS_CHECK_GOTO(code, &lino, _exit);
SUdfSetupResponse *rsp = &task->_setup.rsp; SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle; task->session->severHandle = rsp->udfHandle;
@ -1834,15 +1998,18 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
task->session->bytes = rsp->bytes; task->session->bytes = rsp->bytes;
task->session->bufSize = rsp->bufSize; task->session->bufSize = rsp->bufSize;
strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN); strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
if (task->errCode != 0) { fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) *funcHandle = task->session;
} else {
fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
*funcHandle = task->session;
}
int32_t err = task->errCode;
taosMemoryFree(task); taosMemoryFree(task);
return err; return 0;
_exit:
if (code != 0) {
fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino);
}
taosMemoryFree(task->session);
taosMemoryFree(task);
return code;
} }
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
@ -1854,7 +2021,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
return TSDB_CODE_UDF_PIPE_NOT_EXIST; return TSDB_CODE_UDF_PIPE_NOT_EXIST;
} }
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0; if(task == NULL) {
fnError("udfc call udf. failed to allocate memory for task");
return TSDB_CODE_OUT_OF_MEMORY;
}
task->session = (SUdfcUvSession *)handle; task->session = (SUdfcUvSession *)handle;
task->type = UDF_TASK_CALL; task->type = UDF_TASK_CALL;
@ -1887,10 +2057,9 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
} }
} }
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
if (code != 0) {
if (task->errCode != 0) { fnError("call udf failure. udfcRunUdfUvTask err: %d", code);
fnError("call udf failure. err: %d", task->errCode);
} else { } else {
SUdfCallResponse *rsp = &task->_call.rsp; SUdfCallResponse *rsp = &task->_call.rsp;
switch (callType) { switch (callType) {
@ -1916,9 +2085,8 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
} }
} }
}; };
int err = task->errCode;
taosMemoryFree(task); taosMemoryFree(task);
return err; return code;
} }
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
@ -1957,11 +2125,15 @@ int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdf
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC; int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
SSDataBlock inputBlock = {0}; SSDataBlock inputBlock = {0};
convertScalarParamToDataBlock(input, numOfCols, &inputBlock); int32_t code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
if(code != 0) {
fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code);
return code;
}
SSDataBlock resultBlock = {0}; SSDataBlock resultBlock = {0};
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
if (err == 0) { if (err == 0) {
convertDataBlockToScalarParm(&resultBlock, output); err = convertDataBlockToScalarParm(&resultBlock, output);
taosArrayDestroy(resultBlock.pDataBlock); taosArrayDestroy(resultBlock.pDataBlock);
} }
@ -1970,6 +2142,7 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
} }
int32_t doTeardownUdf(UdfcFuncHandle handle) { int32_t doTeardownUdf(UdfcFuncHandle handle) {
int32_t code = TSDB_CODE_SUCCESS, lino = 0;;
SUdfcUvSession *session = (SUdfcUvSession *)handle; SUdfcUvSession *session = (SUdfcUvSession *)handle;
if (session->udfUvPipe == NULL) { if (session->udfUvPipe == NULL) {
@ -1979,18 +2152,22 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
} }
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0; if(task == NULL) {
fnError("doTeardownUdf, failed to allocate memory for task");
taosMemoryFree(session);
return TSDB_CODE_OUT_OF_MEMORY;
}
task->session = session; task->session = session;
task->type = UDF_TASK_TEARDOWN; task->type = UDF_TASK_TEARDOWN;
SUdfTeardownRequest *req = &task->_teardown.req; SUdfTeardownRequest *req = &task->_teardown.req;
req->udfHandle = task->session->severHandle; req->udfHandle = task->session->severHandle;
udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP);
TAOS_CHECK_GOTO(code, &lino, _exit);
int32_t err = task->errCode; code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
TAOS_CHECK_GOTO(code, &lino, _exit);
udfcRunUdfUvTask(task, UV_TASK_DISCONNECT);
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
// TODO: synchronization refactor between libuv event loop and request thread // TODO: synchronization refactor between libuv event loop and request thread
@ -2000,8 +2177,13 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
conn->session = NULL; conn->session = NULL;
} }
uv_mutex_unlock(&gUdfcProxy.udfcUvMutex); uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
_exit:
if (code != 0) {
fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino);
}
taosMemoryFree(session); taosMemoryFree(session);
taosMemoryFree(task); taosMemoryFree(task);
return err; return code;
} }

View File

@ -54,38 +54,39 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
int32_t udfdCPluginClose() { return 0; } int32_t udfdCPluginClose() { return 0; }
const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
char initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *initSuffix = "_init"; char *initSuffix = "_init";
snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix); snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix);
uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)));
char destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0}; char destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0};
char *destroySuffix = "_destroy"; char *destroySuffix = "_destroy";
snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix); snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix);
uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)));
return udfName; return 0;
} }
void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)));
char startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *startSuffix = "_start"; char *startSuffix = "_start";
snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix); snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix);
uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)));
char finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0}; char finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0};
char *finishSuffix = "_finish"; char *finishSuffix = "_finish";
snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix); snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix);
uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *mergeSuffix = "_merge"; char *mergeSuffix = "_merge";
snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix); snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix);
uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); (void)(uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)));
return 0;
} }
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
@ -99,27 +100,43 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
} }
const char *udfName = udf->name; const char *udfName = udf->name;
udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName);
if (err != 0) {
fnError("can not load init/destroy functions. error: %d", err);
err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
goto _exit;
}
if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) {
fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err));
err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
goto _exit;
}
} else if (udf->funcType == UDF_FUNC_TYPE_AGG) { } else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName);
if (err != 0) {
fnError("can not load aggregation functions. error: %d", err);
err = TSDB_CODE_UDF_LOAD_UDF_FAILURE;
goto _exit;
}
} }
int32_t code = 0;
if (udfCtx->initFunc) { if (udfCtx->initFunc) {
code = (udfCtx->initFunc)(); err = (udfCtx->initFunc)();
if (code != 0) { if (err != 0) {
uv_dlclose(&udfCtx->lib); fnError("udf init function failed. error: %d", err);
taosMemoryFree(udfCtx); goto _exit;
return code;
} }
} }
*pUdfCtx = udfCtx; *pUdfCtx = udfCtx;
return 0; return 0;
_exit:
uv_dlclose(&udfCtx->lib);
taosMemoryFree(udfCtx);
return err;
} }
int32_t udfdCPluginUdfDestroy(void *udfCtx) { int32_t udfdCPluginUdfDestroy(void *udfCtx) {
@ -303,7 +320,7 @@ static int32_t udfdConnectToMnode();
static bool udfdRpcRfp(int32_t code, tmsg_t msgType); static bool udfdRpcRfp(int32_t code, tmsg_t msgType);
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t udfdOpenClientRpc(); static int32_t udfdOpenClientRpc();
static int32_t udfdCloseClientRpc(); static void udfdCloseClientRpc();
static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request); static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request); static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request);
@ -320,7 +337,7 @@ static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf
static void udfdOnNewConnection(uv_stream_t *server, int status); static void udfdOnNewConnection(uv_stream_t *server, int status);
static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); static void udfdIntrSignalHandler(uv_signal_t *handle, int signum);
static int32_t removeListeningPipe(); static void removeListeningPipe();
static void udfdPrintVersion(); static void udfdPrintVersion();
static int32_t udfdParseArgs(int32_t argc, char *argv[]); static int32_t udfdParseArgs(int32_t argc, char *argv[]);
@ -330,13 +347,13 @@ static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv
static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
static int32_t udfdUvInit(); static int32_t udfdUvInit();
static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static int32_t udfdRun(); static void udfdRun();
static void udfdConnectMnodeThreadFunc(void *args); static void udfdConnectMnodeThreadFunc(void *args);
SUdf *udfdNewUdf(const char *udfName); int32_t udfdNewUdf(SUdf **pUdf, const char *udfName);
void udfdGetFuncBodyPath(const SUdf *udf, char *path); void udfdGetFuncBodyPath(const SUdf *udf, char *path);
void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
plugin->openFunc = udfdCPluginOpen; plugin->openFunc = udfdCPluginOpen;
plugin->closeFunc = udfdCPluginClose; plugin->closeFunc = udfdCPluginClose;
@ -349,8 +366,9 @@ void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
plugin->openFunc(items, 1); int32_t err = plugin->openFunc(items, 1);
return; if (err != 0) return err;
return 0;
} }
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) { int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
@ -412,7 +430,9 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
if (plugin->closeFunc) { if (plugin->closeFunc) {
plugin->closeFunc(); if (plugin->closeFunc() != 0) {
fnError("udf script c plugin close func failed.line:%d", __LINE__);
}
} }
plugin->openFunc = NULL; plugin->openFunc = NULL;
plugin->closeFunc = NULL; plugin->closeFunc = NULL;
@ -428,7 +448,9 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
if (plugin->closeFunc) { if (plugin->closeFunc) {
plugin->closeFunc(); if(plugin->closeFunc() != 0) {
fnError("udf script python plugin close func failed.line:%d", __LINE__);
}
} }
uv_dlclose(&plugin->lib); uv_dlclose(&plugin->lib);
if (plugin->libLoaded) { if (plugin->libLoaded) {
@ -447,14 +469,23 @@ void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
int32_t udfdInitScriptPlugin(int8_t scriptType) { int32_t udfdInitScriptPlugin(int8_t scriptType) {
SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
if (plugin == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t err = 0;
switch (scriptType) { switch (scriptType) {
case TSDB_FUNC_SCRIPT_BIN_LIB: case TSDB_FUNC_SCRIPT_BIN_LIB:
udfdInitializeCPlugin(plugin); err = udfdInitializeCPlugin(plugin);
if (err != 0) {
fnError("udf script c plugin init failed. error: %d", err);
taosMemoryFree(plugin);
return err;
}
break; break;
case TSDB_FUNC_SCRIPT_PYTHON: { case TSDB_FUNC_SCRIPT_PYTHON: {
int32_t err = udfdInitializePythonPlugin(plugin); err = udfdInitializePythonPlugin(plugin);
if (err != 0) { if (err != 0) {
fnError("udf script python plugin init failed. error: %d", err);
taosMemoryFree(plugin); taosMemoryFree(plugin);
return err; return err;
} }
@ -489,7 +520,7 @@ void udfdDeinitScriptPlugins() {
void udfdProcessRequest(uv_work_t *req) { void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0}; SUdfRequest request = {0};
decodeUdfRequest(uvUdf->input.base, &request); if(decodeUdfRequest(uvUdf->input.base, &request) == NULL) return;
switch (request.type) { switch (request.type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP: {
@ -544,6 +575,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
if (scriptPlugin == NULL) { if (scriptPlugin == NULL) {
err = udfdInitScriptPlugin(udf->scriptType); err = udfdInitScriptPlugin(udf->scriptType);
if (err != 0) { if (err != 0) {
fnError("udf name %s init script plugin failed. error %d", udfName, err);
uv_mutex_unlock(&global.scriptPluginsMutex); uv_mutex_unlock(&global.scriptPluginsMutex);
return err; return err;
} }
@ -563,15 +595,15 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
return 0; return 0;
} }
SUdf *udfdNewUdf(const char *udfName) { int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1; udfNew->refCount = 1;
udfNew->lastFetchTime = taosGetTimestampMs(); udfNew->lastFetchTime = taosGetTimestampMs();
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
udfNew->state = UDF_STATE_INIT; udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock); if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
uv_cond_init(&udfNew->condReady); if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE;
udfNew->resident = false; udfNew->resident = false;
udfNew->expired = false; udfNew->expired = false;
@ -582,10 +614,28 @@ SUdf *udfdNewUdf(const char *udfName) {
break; break;
} }
} }
return udfNew; *pUdf = udfNew;
return 0;
} }
SUdf *udfdGetOrCreateUdf(const char *udfName) { void udfdFreeUdf(void *pData) {
SUdf *pSudf = (SUdf *)pData;
if (pSudf == NULL) {
return;
}
if (pSudf->scriptPlugin != NULL) {
if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) {
fnError("udfdFreeUdf: udfd destroy udf %s failed", pSudf->name);
}
}
uv_mutex_destroy(&pSudf->lock);
uv_cond_destroy(&pSudf->condReady);
taosMemoryFree(pSudf);
}
int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
int64_t currTime = taosGetTimestampMs(); int64_t currTime = taosGetTimestampMs();
@ -594,26 +644,34 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) {
expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s
if (!expired) { if (!expired) {
++(*pUdfHash)->refCount; ++(*pUdfHash)->refCount;
SUdf *udf = *pUdfHash; *ppUdf = *pUdfHash;
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version, fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version,
udf->createdTime); (*ppUdf)->createdTime);
return udf; return 0;
} else { } else {
(*pUdfHash)->expired = true; (*pUdfHash)->expired = true;
fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64, fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
(*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime); (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
taosHashRemove(global.udfsHash, udfName, strlen(udfName)); if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) {
fnError("udfdGetOrCreateUdf: udfd remove udf %s failed", udfName);
}
} }
} }
SUdf *udf = udfdNewUdf(udfName); int32_t code = udfdNewUdf(ppUdf, udfName);
if(code != 0) {
uv_mutex_unlock(&global.udfsMutex);
return code;
}
SUdf **pUdf = &udf; if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) {
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex);
return code;
}
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
return udf; return 0;
} }
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
@ -622,10 +680,13 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfSetupRequest *setup = &request->setup; SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SUdf *udf = NULL; SUdf *udf = NULL;
udf = udfdGetOrCreateUdf(setup->udfName);
code = udfdGetOrCreateUdf(&udf, setup->udfName);
if(code != 0) {
fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName);
goto _send;
}
uv_mutex_lock(&udf->lock); uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) { if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING; udf->state = UDF_STATE_LOADING;
@ -646,6 +707,8 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf; handle->udf = udf;
_send:
;
SUdfResponse rsp; SUdfResponse rsp;
rsp.seqNum = request->seqNum; rsp.seqNum = request->seqNum;
rsp.type = request->type; rsp.type = request->type;
@ -656,11 +719,23 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
rsp.setupRsp.bufSize = udf->bufSize; rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp); int32_t len = encodeUdfResponse(NULL, &rsp);
if(len < 0) {
fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
return;
}
rsp.msgLen = len; rsp.msgLen = len;
void *bufBegin = taosMemoryMalloc(len); void *bufBegin = taosMemoryMalloc(len);
if(bufBegin == NULL) {
fnError("udfdProcessSetupRequest: malloc failed. len %d", len);
return;
}
void *buf = bufBegin; void *buf = bufBegin;
encodeUdfResponse(&buf, &rsp); if(encodeUdfResponse(&buf, &rsp) < 0) {
fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len);
taosMemoryFree(bufBegin);
return;
}
uvUdf->output = uv_buf_init(bufBegin, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
@ -685,30 +760,35 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
output.colMeta.type = udf->outputType; output.colMeta.type = udf->outputType;
output.colMeta.precision = 0; output.colMeta.precision = 0;
output.colMeta.scale = 0; output.colMeta.scale = 0;
udfColEnsureCapacity(&output, call->block.info.rows); if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) {
SUdfDataBlock input = {0};
SUdfDataBlock input = {0}; code = convertDataBlockToUdfDataBlock(&call->block, &input);
convertDataBlockToUdfDataBlock(&call->block, &input); if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); freeUdfDataDataBlock(&input);
freeUdfDataDataBlock(&input); if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
if(code == 0) convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); }
freeUdfColumn(&output); freeUdfColumn(&output);
break; break;
} }
case TSDB_UDF_CALL_AGG_INIT: { case TSDB_UDF_CALL_AGG_INIT: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); if (outBuf.buf != NULL) {
code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
subRsp->resultBuf = outBuf; subRsp->resultBuf = outBuf;
break; break;
} }
case TSDB_UDF_CALL_AGG_PROC: { case TSDB_UDF_CALL_AGG_PROC: {
SUdfDataBlock input = {0}; SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input); if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf); freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
}
freeUdfDataDataBlock(&input); freeUdfDataDataBlock(&input);
subRsp->resultBuf = outBuf;
break; break;
} }
@ -738,10 +818,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
subRsp->callType = call->callType; subRsp->callType = call->callType;
int32_t len = encodeUdfResponse(NULL, rsp); int32_t len = encodeUdfResponse(NULL, rsp);
if(len < 0) {
fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
return;
}
rsp->msgLen = len; rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len); void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin; void *buf = bufBegin;
encodeUdfResponse(&buf, rsp); if(encodeUdfResponse(&buf, rsp) < 0) {
fnError("udfdProcessCallRequest: encode udf response failed. len %d", len);
taosMemoryFree(bufBegin);
return;
}
uvUdf->output = uv_buf_init(bufBegin, len); uvUdf->output = uv_buf_init(bufBegin, len);
switch (call->callType) { switch (call->callType) {
@ -787,7 +876,11 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
udf->refCount--; udf->refCount--;
if (udf->refCount == 0 && (!udf->resident || udf->expired)) { if (udf->refCount == 0 && (!udf->resident || udf->expired)) {
unloadUdf = true; unloadUdf = true;
taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name));
if (code != 0) {
fnError("udf name %s remove from hash failed", udf->name);
goto _send;
}
} }
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
if (unloadUdf) { if (unloadUdf) {
@ -798,18 +891,27 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
fnDebug("udfd destroy function returns %d", code); fnDebug("udfd destroy function returns %d", code);
taosMemoryFree(udf); taosMemoryFree(udf);
} }
taosMemoryFree(handle);
_send:
taosMemoryFree(handle);
SUdfResponse response = {0}; SUdfResponse response = {0};
SUdfResponse *rsp = &response; SUdfResponse *rsp = &response;
rsp->seqNum = request->seqNum; rsp->seqNum = request->seqNum;
rsp->type = request->type; rsp->type = request->type;
rsp->code = code; rsp->code = code;
int32_t len = encodeUdfResponse(NULL, rsp); int32_t len = encodeUdfResponse(NULL, rsp);
if (len < 0) {
fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
return;
}
rsp->msgLen = len; rsp->msgLen = len;
void *bufBegin = taosMemoryMalloc(len); void *bufBegin = taosMemoryMalloc(len);
void *buf = bufBegin; void *buf = bufBegin;
encodeUdfResponse(&buf, rsp); if (encodeUdfResponse(&buf, rsp) < 0) {
fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len);
taosMemoryFree(bufBegin);
return;
}
uvUdf->output = uv_buf_init(bufBegin, len); uvUdf->output = uv_buf_init(bufBegin, len);
taosMemoryFree(uvUdf->input.base); taosMemoryFree(uvUdf->input.base);
@ -865,7 +967,10 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
fnError("udfd write udf shared library failed"); fnError("udfd write udf shared library failed");
return TSDB_CODE_FILE_CORRUPTED; return TSDB_CODE_FILE_CORRUPTED;
} }
taosCloseFile(&file); if(taosCloseFile(&file) != 0) {
fnError("udfdSaveFuncBodyToFile, udfd close file failed");
return TSDB_CODE_FILE_CORRUPTED;
}
strncpy(udf->path, path, PATH_MAX); strncpy(udf->path, path, PATH_MAX);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -888,7 +993,10 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0}; SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){
fnError("udfd deserialize connect response failed");
goto _return;
}
int32_t now = taosGetTimestampSec(); int32_t now = taosGetTimestampSec();
int32_t delta = abs(now - connectRsp.svrTimestamp); int32_t delta = abs(now - connectRsp.svrTimestamp);
@ -908,7 +1016,10 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
msgInfo->code = 0; msgInfo->code = 0;
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0}; SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){
fnError("udfd deserialize retrieve func response failed");
goto _return;
}
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf *udf = msgInfo->param; SUdf *udf = msgInfo->param;
@ -940,28 +1051,43 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1; retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName); if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) {
taosArrayDestroy(retrieveReq.pFuncNames);
return terrno;
}
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
if(contLen < 0) {
taosArrayDestroy(retrieveReq.pFuncNames);
return terrno;
}
void *pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq) < 0) {
taosArrayDestroy(retrieveReq.pFuncNames);
rpcFreeCont(pReq);
return terrno;
}
taosArrayDestroy(retrieveReq.pFuncNames); taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf; msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0); if(uv_sem_init(&msgInfo->resultSem, 0) != 0) {
taosMemoryFree(msgInfo);
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
}
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq; rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen; rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
rpcMsg.info.ahandle = msgInfo; rpcMsg.info.ahandle = msgInfo;
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
if (code == 0) {
uv_sem_wait(&msgInfo->resultSem); uv_sem_wait(&msgInfo->resultSem);
uv_sem_destroy(&msgInfo->resultSem); uv_sem_destroy(&msgInfo->resultSem);
int32_t code = msgInfo->code; code = msgInfo->code;
}
taosMemoryFree(msgInfo); taosMemoryFree(msgInfo);
return code; return code;
} }
@ -1009,8 +1135,12 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe
return -1; return -1;
} }
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
mgmtEpSet->numOfEps++; if (code != TSDB_CODE_SUCCESS) {
fnError("invalid ep %s", secondEp);
} else {
mgmtEpSet->numOfEps++;
}
} }
if (mgmtEpSet->numOfEps == 0) { if (mgmtEpSet->numOfEps == 0) {
@ -1039,7 +1169,7 @@ int32_t udfdOpenClientRpc() {
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); TAOS_CHECK_RETURN(taosVersionStrToInt(version, &(rpcInit.compatibilityVer)));
global.clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) { if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client"); fnError("failed to init dnode rpc client");
@ -1048,11 +1178,10 @@ int32_t udfdOpenClientRpc() {
return 0; return 0;
} }
int32_t udfdCloseClientRpc() { void udfdCloseClientRpc() {
fnInfo("udfd begin closing rpc"); fnInfo("udfd begin closing rpc");
rpcClose(global.clientRpc); rpcClose(global.clientRpc);
fnInfo("udfd finish closing rpc"); fnInfo("udfd finish closing rpc");
return 0;
} }
void udfdOnWrite(uv_write_t *req, int status) { void udfdOnWrite(uv_write_t *req, int status) {
@ -1082,7 +1211,11 @@ void udfdSendResponse(uv_work_t *work, int status) {
if (udfWork->conn != NULL) { if (udfWork->conn != NULL) {
uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t));
write_req->data = udfWork; write_req->data = udfWork;
uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite);
if (code != 0) {
fnError("udfd send response error %s", uv_strerror(code));
taosMemoryFree(write_req);
}
} }
taosMemoryFree(work); taosMemoryFree(work);
} }
@ -1146,7 +1279,12 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
conn->inputCap = 0; conn->inputCap = 0;
conn->inputTotal = -1; conn->inputTotal = -1;
work->data = udfWork; work->data = udfWork;
uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse); if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0)
{
fnError("udfd queue work failed");
taosMemoryFree(work);
taosMemoryFree(udfWork);
}
} }
void udfdPipeCloseCb(uv_handle_t *pipe) { void udfdPipeCloseCb(uv_handle_t *pipe) {
@ -1193,9 +1331,15 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
fnError("udfd new connection error. code: %s", uv_strerror(status)); fnError("udfd new connection error. code: %s", uv_strerror(status));
return; return;
} }
int32_t code = 0;
uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(global.loop, client, 0); code = uv_pipe_init(global.loop, client, 0);
if (code) {
fnError("udfd pipe init error %s", uv_strerror(code));
taosMemoryFree(client);
return;
}
if (uv_accept(server, (uv_stream_t *)client) == 0) { if (uv_accept(server, (uv_stream_t *)client) == 0) {
SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn));
ctx->pWorkList = NULL; ctx->pWorkList = NULL;
@ -1205,7 +1349,13 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
ctx->inputCap = 0; ctx->inputCap = 0;
client->data = ctx; client->data = ctx;
ctx->client = (uv_stream_t *)client; ctx->client = (uv_stream_t *)client;
uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead);
if (code) {
fnError("udfd read start error %s", uv_strerror(code));
udfdUvHandleError(ctx);
taosMemoryFree(ctx);
taosMemoryFree(client);
}
} else { } else {
uv_close((uv_handle_t *)client, NULL); uv_close((uv_handle_t *)client, NULL);
} }
@ -1214,8 +1364,14 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
fnInfo("udfd signal received: %d\n", signum); fnInfo("udfd signal received: %d\n", signum);
uv_fs_t req; uv_fs_t req;
uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_signal_stop(handle); if(code) {
fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__);
}
code = uv_signal_stop(handle);
if(code) {
fnError("stop signal handler failed, reason:%s", uv_strerror(code));
}
uv_stop(global.loop); uv_stop(global.loop);
} }
@ -1224,12 +1380,12 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) { if (strlen(argv[++i]) >= PATH_MAX) {
printf("config file path overflow"); (void)printf("config file path overflow");
return -1; return -1;
} }
tstrncpy(configDir, argv[i], PATH_MAX); tstrncpy(configDir, argv[i], PATH_MAX);
} else { } else {
printf("'-c' requires a parameter, default is %s\n", configDir); (void)printf("'-c' requires a parameter, default is %s\n", configDir);
return -1; return -1;
} }
} else if (strcmp(argv[i], "-V") == 0) { } else if (strcmp(argv[i], "-V") == 0) {
@ -1242,9 +1398,9 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) {
} }
static void udfdPrintVersion() { static void udfdPrintVersion() {
printf("udfd version: %s compatible_version: %s\n", version, compatible_version); (void)printf("udfd version: %s compatible_version: %s\n", version, compatible_version);
printf("git: %s\n", gitinfo); (void)printf("git: %s\n", gitinfo);
printf("build: %s\n", buildinfo); (void)printf("build: %s\n", buildinfo);
} }
static int32_t udfdInitLog() { static int32_t udfdInitLog() {
@ -1270,35 +1426,31 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
taosMemoryFree(buf->base); taosMemoryFree(buf->base);
} }
static int32_t removeListeningPipe() { static void removeListeningPipe() {
uv_fs_t req; uv_fs_t req;
int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_fs_req_cleanup(&req); uv_fs_req_cleanup(&req);
return err; if(err) {
fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__);
}
} }
static int32_t udfdUvInit() { static int32_t udfdUvInit() {
uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); TAOS_CHECK_RETURN(uv_loop_init(global.loop));
if (loop) {
uv_loop_init(loop);
} else {
return -1;
}
global.loop = loop;
if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit
uv_pipe_init(global.loop, &global.ctrlPipe, 1); TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1));
uv_pipe_open(&global.ctrlPipe, 0); TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0));
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb));
} }
getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName)); getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
removeListeningPipe(); removeListeningPipe();
uv_pipe_init(global.loop, &global.listeningPipe, 0); TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0));
uv_signal_init(global.loop, &global.intrSignal); TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal));
uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT); TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT));
int r; int r;
fnInfo("bind to pipe %s", global.listenPipeName); fnInfo("bind to pipe %s", global.listenPipeName);
@ -1321,25 +1473,59 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
} }
} }
static int32_t udfdRun() { static int32_t udfdGlobalDataInit() {
uv_mutex_init(&global.scriptPluginsMutex); uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop == NULL) {
fnError("udfd init uv loop failed, mem overflow");
return -1;
}
global.loop = loop;
if (uv_mutex_init(&global.scriptPluginsMutex) != 0) {
fnError("udfd init script plugins mutex failed");
return -1;
}
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex); if (global.udfsHash == NULL) {
return terrno;
}
// taosHashSetFreeFp(global.udfsHash, udfdFreeUdf);
fnInfo("start udfd event loop"); if (uv_mutex_init(&global.udfsMutex) != 0) {
uv_run(global.loop, UV_RUN_DEFAULT); fnError("udfd init udfs mutex failed");
fnInfo("udfd event loop stopped."); return -2;
}
uv_loop_close(global.loop);
uv_walk(global.loop, udfdCloseWalkCb, NULL);
uv_run(global.loop, UV_RUN_DEFAULT);
uv_loop_close(global.loop);
return 0; return 0;
} }
static void udfdGlobalDataDeinit() {
taosHashCleanup(global.udfsHash);
uv_mutex_destroy(&global.udfsMutex);
uv_mutex_destroy(&global.scriptPluginsMutex);
taosMemoryFree(global.loop);
fnInfo("udfd global data deinit");
}
static void udfdRun() {
fnInfo("start udfd event loop");
int32_t code = uv_run(global.loop, UV_RUN_DEFAULT);
if(code != 0) {
fnError("udfd event loop still has active handles or requests.");
}
fnInfo("udfd event loop stopped.");
(void)uv_loop_close(global.loop);
uv_walk(global.loop, udfdCloseWalkCb, NULL);
code = uv_run(global.loop, UV_RUN_DEFAULT);
if(code != 0) {
fnError("udfd event loop still has active handles or requests.");
}
(void)uv_loop_close(global.loop);
}
int32_t udfdInitResidentFuncs() { int32_t udfdInitResidentFuncs() {
if (strlen(tsUdfdResFuncs) == 0) { if (strlen(tsUdfdResFuncs) == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1352,13 +1538,17 @@ int32_t udfdInitResidentFuncs() {
char func[TSDB_FUNC_NAME_LEN + 1] = {0}; char func[TSDB_FUNC_NAME_LEN + 1] = {0};
strncpy(func, token, TSDB_FUNC_NAME_LEN); strncpy(func, token, TSDB_FUNC_NAME_LEN);
fnInfo("udfd add resident function %s", func); fnInfo("udfd add resident function %s", func);
taosArrayPush(global.residentFuncs, func); if(taosArrayPush(global.residentFuncs, func) == NULL)
{
taosArrayDestroy(global.residentFuncs);
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t udfdDeinitResidentFuncs() { void udfdDeinitResidentFuncs() {
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char *funcName = taosArrayGet(global.residentFuncs, i); char *funcName = taosArrayGet(global.residentFuncs, i);
SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
@ -1366,18 +1556,15 @@ int32_t udfdDeinitResidentFuncs() {
SUdf *udf = *udfInHash; SUdf *udf = *udfInHash;
int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
fnDebug("udfd destroy function returns %d", code); fnDebug("udfd destroy function returns %d", code);
taosHashRemove(global.udfsHash, funcName, strlen(funcName)); if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0)
{
fnError("udfd remove resident function %s failed", funcName);
}
taosMemoryFree(udf); taosMemoryFree(udf);
} }
} }
taosArrayDestroy(global.residentFuncs); taosArrayDestroy(global.residentFuncs);
return TSDB_CODE_SUCCESS; fnInfo("udfd resident functions are deinit");
}
int32_t udfdCleanup() {
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return 0;
} }
int32_t udfdCreateUdfSourceDir() { int32_t udfdCreateUdfSourceDir() {
@ -1392,20 +1579,27 @@ int32_t udfdCreateUdfSourceDir() {
return code; return code;
} }
int32_t udfdDestroyUdfSourceDir() { void udfdDestroyUdfSourceDir() {
fnInfo("destory udf source directory %s", global.udfDataDir); fnInfo("destory udf source directory %s", global.udfDataDir);
taosRemoveDir(global.udfDataDir); taosRemoveDir(global.udfDataDir);
return 0;
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
int code = 0;
bool logInitialized = false;
bool cfgInitialized = false;
bool openClientRpcFinished = false;
bool residentFuncsInited = false;
bool udfSourceDirInited = false;
bool globalDataInited = false;
if (!taosCheckSystemIsLittleEnd()) { if (!taosCheckSystemIsLittleEnd()) {
printf("failed to start since on non-little-end machines\n"); (void)printf("failed to start since on non-little-end machines\n");
return -1; return -1;
} }
if (udfdParseArgs(argc, argv) != 0) { if (udfdParseArgs(argc, argv) != 0) {
printf("failed to start since parse args error\n"); (void)printf("failed to start since parse args error\n");
return -1; return -1;
} }
@ -1416,47 +1610,89 @@ int main(int argc, char *argv[]) {
if (udfdInitLog() != 0) { if (udfdInitLog() != 0) {
// ignore create log failed, because this error no matter // ignore create log failed, because this error no matter
printf("failed to start since init log error\n"); (void)printf("failed to init udfd log.");
} else {
logInitialized = true; // log is initialized
} }
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error"); fnError("failed to start since read config error");
taosCloseLog(); code = -2;
return -2; goto _exit;
} }
cfgInitialized = true; // cfg is initialized
fnInfo("udfd start with config file %s", configDir);
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) {
fnError("init ep set from cfg failed");
code = -3;
goto _exit;
}
fnInfo("udfd start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn);
if (udfdOpenClientRpc() != 0) { if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failed"); fnError("open rpc connection to mnode failed");
taosCloseLog(); code = -4;
return -3; goto _exit;
} }
fnInfo("udfd rpc client is opened");
openClientRpcFinished = true; // rpc is opened
if (udfdCreateUdfSourceDir() != 0) { if (udfdCreateUdfSourceDir() != 0) {
fnError("create udf source directory failed"); fnError("create udf source directory failed");
taosCloseLog(); code = -5;
return -4; goto _exit;
} }
udfSourceDirInited = true; // udf source dir is created
fnInfo("udfd udf source directory is created");
if (udfdGlobalDataInit() != 0) {
fnError("init global data failed");
code = -6;
goto _exit;
}
globalDataInited = true; // global data is inited
fnInfo("udfd global data is inited");
if (udfdUvInit() != 0) { if (udfdUvInit() != 0) {
fnError("uv init failure"); fnError("uv init failure");
taosCloseLog(); code = -7;
return -5; goto _exit;
} }
fnInfo("udfd uv is inited");
udfdInitResidentFuncs(); if (udfdInitResidentFuncs() != 0) {
fnError("init resident functions failed");
code = -8;
goto _exit;
}
residentFuncsInited = true; // resident functions are inited
fnInfo("udfd resident functions are inited");
udfdRun(); udfdRun();
fnInfo("udfd exit normally");
removeListeningPipe(); removeListeningPipe();
udfdDestroyUdfSourceDir();
udfdCloseClientRpc();
udfdDeinitResidentFuncs();
udfdDeinitScriptPlugins(); udfdDeinitScriptPlugins();
taosCloseLog(); _exit:
udfdCleanup(); if (globalDataInited) {
return 0; udfdGlobalDataDeinit();
}
if (residentFuncsInited) {
udfdDeinitResidentFuncs();
}
if (udfSourceDirInited) {
udfdDestroyUdfSourceDir();
}
if (openClientRpcFinished) {
udfdCloseClientRpc();
}
if (cfgInitialized) {
taosCleanupCfg();
}
if (logInitialized) {
taosCloseLog();
}
return code;
} }

View File

@ -9,17 +9,20 @@
#include "tglobal.h" #include "tglobal.h"
#include "tudf.h" #include "tudf.h"
#define TAOSFPRINTF(stream, format, ...) ((void)fprintf(stream, format, ##__VA_ARGS__))
#define TAOSPRINTF(format, ...) ((void)printf(format, ##__VA_ARGS__))
static int32_t parseArgs(int32_t argc, char *argv[]) { static int32_t parseArgs(int32_t argc, char *argv[]) {
for (int32_t i = 1; i < argc; ++i) { for (int32_t i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) { if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) { if (strlen(argv[++i]) >= PATH_MAX) {
printf("config file path overflow"); TAOSPRINTF("config file path overflow");
return -1; return -1;
} }
tstrncpy(configDir, argv[i], PATH_MAX); tstrncpy(configDir, argv[i], PATH_MAX);
} else { } else {
printf("'-c' requires a parameter, default is %s\n", configDir); TAOSPRINTF("'-c' requires a parameter, default is %s\n", configDir);
return -1; return -1;
} }
} }
@ -35,6 +38,7 @@ static int32_t initLog() {
} }
int scalarFuncTest() { int scalarFuncTest() {
int32_t ret = 0;
UdfcFuncHandle handle; UdfcFuncHandle handle;
if (doSetupUdf("udf1", &handle) != 0) { if (doSetupUdf("udf1", &handle) != 0) {
@ -47,10 +51,18 @@ int scalarFuncTest() {
SSDataBlock *pBlock = &block; SSDataBlock *pBlock = &block;
for (int32_t i = 0; i < 1; ++i) { for (int32_t i = 0; i < 1; ++i) {
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pBlock, &colInfo); ret = blockDataAppendColInfo(pBlock, &colInfo);
if (ret != 0) {
fnError("failed to append column info");
return -1;
}
} }
blockDataEnsureCapacity(pBlock, 1024); ret = blockDataEnsureCapacity(pBlock, 1024);
if (ret != 0) {
fnError("failed to ensure capacity");
return -1;
}
pBlock->info.rows = 1024; pBlock->info.rows = 1024;
SColumnInfoData *pCol = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData *pCol = taosArrayGet(pBlock->pDataBlock, 0);
@ -63,38 +75,56 @@ int scalarFuncTest() {
input.columnData = taosArrayGet(pBlock->pDataBlock, 0); input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0}; SScalarParam output = {0};
doCallUdfScalarFunc(handle, &input, 1, &output); ret = doCallUdfScalarFunc(handle, &input, 1, &output);
if (ret != 0) {
fnError("failed to call udf scalar function");
return -1;
}
taosArrayDestroy(pBlock->pDataBlock); taosArrayDestroy(pBlock->pDataBlock);
SColumnInfoData *col = output.columnData; SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) { for (int32_t i = 0; i < output.numOfRows; ++i) {
if (i % 100 == 0) fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); if (i % 100 == 0) TAOSFPRINTF(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
} }
colDataDestroy(output.columnData); colDataDestroy(output.columnData);
taosMemoryFree(output.columnData); taosMemoryFree(output.columnData);
} }
int64_t end = taosGetTimestampUs(); int64_t end = taosGetTimestampUs();
fprintf(stderr, "time: %f\n", (end - beg) / 1000.0); TAOSFPRINTF(stderr, "time: %f\n", (end - beg) / 1000.0);
doTeardownUdf(handle); ret = doTeardownUdf(handle);
if (ret != 0) {
fnError("failed to teardown udf");
return -1;
}
return 0; return 0;
} }
int aggregateFuncTest() { int aggregateFuncTest() {
int32_t ret = 0;
UdfcFuncHandle handle; UdfcFuncHandle handle;
if (doSetupUdf("udf2", &handle) != 0) { ret = doSetupUdf("udf2", &handle);
fnError("setup udf failure"); if (ret != 0) {
fnError("setup udf failure, code:%d", ret);
return -1; return -1;
} }
SSDataBlock *pBlock = createDataBlock(); SSDataBlock *pBlock = createDataBlock();
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1);
blockDataAppendColInfo(pBlock, &colInfo); ret = blockDataAppendColInfo(pBlock, &colInfo);
if(ret != 0) {
fnError( "failed to append column info. code:%d", ret);
return -1;
}
} }
blockDataEnsureCapacity(pBlock, 1024); ret = blockDataEnsureCapacity(pBlock, 1024);
if (ret != 0) {
fnError( "failed to ensure capacity. code:%d", ret);
return -1;
}
pBlock->info.rows = 1024; pBlock->info.rows = 1024;
SColumnInfoData *pColInfo = bdGetColumnInfoData(pBlock, 0); SColumnInfoData *pColInfo = bdGetColumnInfoData(pBlock, 0);
@ -105,37 +135,77 @@ int aggregateFuncTest() {
SUdfInterBuf buf = {0}; SUdfInterBuf buf = {0};
SUdfInterBuf newBuf = {0}; SUdfInterBuf newBuf = {0};
SUdfInterBuf resultBuf = {0}; SUdfInterBuf resultBuf = {0};
doCallUdfAggInit(handle, &buf); ret = doCallUdfAggInit(handle, &buf);
doCallUdfAggProcess(handle, pBlock, &buf, &newBuf); if (ret != 0) {
fnError("failed to init udf. code:%d", ret);
return -1;
}
ret = doCallUdfAggProcess(handle, pBlock, &buf, &newBuf);
if (ret != 0) {
fnError("failed to process udf. code:%d", ret);
return -1;
}
taosArrayDestroy(pBlock->pDataBlock); taosArrayDestroy(pBlock->pDataBlock);
doCallUdfAggFinalize(handle, &newBuf, &resultBuf); ret = doCallUdfAggFinalize(handle, &newBuf, &resultBuf);
if (ret != 0) {
TAOSFPRINTF(stderr,"failed to finalize udf. code:%d", ret);
return -1;
}
if (resultBuf.buf != NULL) { if (resultBuf.buf != NULL) {
fprintf(stderr, "agg result: %f\n", *(double *)resultBuf.buf); TAOSFPRINTF(stderr, "agg result: %f\n", *(double *)resultBuf.buf);
} else { } else {
fprintf(stderr, "result buffer is null"); fnError("result buffer is null");
} }
freeUdfInterBuf(&buf); freeUdfInterBuf(&buf);
freeUdfInterBuf(&newBuf); freeUdfInterBuf(&newBuf);
freeUdfInterBuf(&resultBuf); freeUdfInterBuf(&resultBuf);
doTeardownUdf(handle); ret = doTeardownUdf(handle);
if (ret != 0) {
fnError("failed to teardown udf. code:%d", ret);
return -1;
}
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
return 0; return 0;
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
parseArgs(argc, argv); int32_t ret = 0;
initLog(); ret = parseArgs(argc, argv);
if (ret != 0) {
fnError("failed to parse args");
return -1;
}
ret = initLog();
if (ret != 0) {
fnError("failed to init log");
return -1;
}
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error"); fnError("failed to start since read config error");
return -1; return -1;
} }
udfcOpen(); if (udfcOpen() != 0) {
fnError("failed to open udfc");
return -1;
}
uv_sleep(1000); uv_sleep(1000);
scalarFuncTest(); ret = scalarFuncTest();
aggregateFuncTest(); if (ret != 0) {
udfcClose(); fnError("failed to run scalar function test");
return -1;
}
ret = aggregateFuncTest();
if (ret != 0) {
fnError("failed to run aggregate function test");
return -1;
}
ret = udfcClose();
if (ret != 0) {
fnError("failed to close udfc");
return -1;
}
} }

View File

@ -14,18 +14,25 @@ DLL_EXPORT int32_t udf1_init() { return 0; }
DLL_EXPORT int32_t udf1_destroy() { return 0; } DLL_EXPORT int32_t udf1_destroy() { return 0; }
DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) { DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) {
int32_t code = 0;
SUdfColumnData *resultData = &resultCol->colData; SUdfColumnData *resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) { for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0; int j = 0;
for (; j < block->numOfCols; ++j) { for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) { if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i); code = udfColDataSetNull(resultCol, i);
if (code != 0) {
return code;
}
break; break;
} }
} }
if (j == block->numOfCols) { if (j == block->numOfCols) {
int32_t luckyNum = 1; int32_t luckyNum = 1;
udfColDataSet(resultCol, i, (char *)&luckyNum, false); code = udfColDataSet(resultCol, i, (char *)&luckyNum, false);
if (code != 0) {
return code;
}
} }
} }
// to simulate actual processing delay by udf // to simulate actual processing delay by udf

View File

@ -15,18 +15,25 @@ DLL_EXPORT int32_t udf1_dup_init() { return 0; }
DLL_EXPORT int32_t udf1_dup_destroy() { return 0; } DLL_EXPORT int32_t udf1_dup_destroy() { return 0; }
DLL_EXPORT int32_t udf1_dup(SUdfDataBlock *block, SUdfColumn *resultCol) { DLL_EXPORT int32_t udf1_dup(SUdfDataBlock *block, SUdfColumn *resultCol) {
int32_t code = 0;
SUdfColumnData *resultData = &resultCol->colData; SUdfColumnData *resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) { for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0; int j = 0;
for (; j < block->numOfCols; ++j) { for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) { if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i); code = udfColDataSetNull(resultCol, i);
if (code != 0) {
return code;
}
break; break;
} }
} }
if (j == block->numOfCols) { if (j == block->numOfCols) {
int32_t luckyNum = 2; int32_t luckyNum = 2;
udfColDataSet(resultCol, i, (char *)&luckyNum, false); code = udfColDataSet(resultCol, i, (char *)&luckyNum, false);
if (code != 0) {
return code;
}
} }
} }
// to simulate actual processing delay by udf // to simulate actual processing delay by udf

View File

@ -552,22 +552,22 @@ _end:
#endif #endif
} }
int32_t taosGetCpuCores(float *numOfCores, bool physical) { void taosGetCpuCores(float *numOfCores, bool physical) {
#ifdef WINDOWS #ifdef WINDOWS
SYSTEM_INFO info; SYSTEM_INFO info;
GetSystemInfo(&info); GetSystemInfo(&info);
*numOfCores = info.dwNumberOfProcessors; *numOfCores = info.dwNumberOfProcessors;
return 0; return;
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
*numOfCores = sysconf(_SC_NPROCESSORS_ONLN); *numOfCores = sysconf(_SC_NPROCESSORS_ONLN);
return 0; return;
#else #else
if (physical) { if (physical) {
*numOfCores = sysconf(_SC_NPROCESSORS_ONLN); *numOfCores = sysconf(_SC_NPROCESSORS_ONLN);
} else { } else {
taosCntrGetCpuCores(numOfCores); taosCntrGetCpuCores(numOfCores);
} }
return 0; return;
#endif #endif
} }

View File

@ -723,6 +723,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_EXEC_FAILURE, "udf function execution failure") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_EXEC_FAILURE, "udf function execution failure")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_UV_EXEC_FAILURE, "udf uvlib function execution failure")
//schemaless //schemaless
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type") TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")