Merge remote-tracking branch 'laiyongtao/fix-udfd-env-var-cfg' into feat/contrib30

This commit is contained in:
Shengliang Guan 2024-11-04 11:39:41 +08:00
commit 07f7b57cd7
1 changed files with 149 additions and 108 deletions

View File

@ -40,7 +40,7 @@ typedef struct SUdfdData {
#ifdef WINDOWS
HANDLE jobHandle;
#endif
int spawnErr;
int32_t spawnErr;
uv_pipe_t ctrlPipe;
uv_async_t stopAsync;
int32_t stopCalled;
@ -53,13 +53,15 @@ SUdfdData udfdGlobal = {0};
int32_t udfStartUdfd(int32_t startDnodeId);
void udfStopUdfd();
extern char **environ;
static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal);
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg);
static void udfUdfdStopAsyncCb(uv_async_t *async);
static void udfWatchUdfd(void *args);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
SUdfdData *pData = process->data;
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
@ -129,14 +131,14 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
float numCpuCores = 4;
int32_t code = taosGetCpuCores(&numCpuCores, false);
if (code != 0) {
fnError("failed to get cpu cores, code:%d", code);
fnError("failed to get cpu cores, code:0x%x", code);
}
numCpuCores = TMAX(numCpuCores, 2);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2);
snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2);
char pathTaosdLdLib[512] = {0};
size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib);
int ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen);
if (ret != UV_ENOBUFS) {
taosdLdLibPathLen = strlen(pathTaosdLdLib);
}
@ -158,8 +160,8 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
char *taosFqdnEnvItem = NULL;
char *taosFqdn = getenv("TAOS_FQDN");
if (taosFqdn != NULL) {
int subLen = strlen(taosFqdn);
int len = strlen("TAOS_FQDN=") + subLen + 1;
int32_t subLen = strlen(taosFqdn);
int32_t len = strlen("TAOS_FQDN=") + subLen + 1;
taosFqdnEnvItem = taosMemoryMalloc(len);
if (taosFqdnEnvItem != NULL) {
tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len);
@ -173,9 +175,34 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL};
options.env = envUdfd;
char **envUdfdWithPEnv = NULL;
if (environ != NULL) {
int32_t numEnviron = 0;
while (environ[numEnviron] != NULL) {
numEnviron++;
}
int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd);
int err = uv_spawn(&pData->loop, &pData->process, &options);
envUdfdWithPEnv = (char **)taosMemoryMalloc((numEnviron + lenEnvUdfd) * sizeof(char *));
for (int32_t i = 0; i < numEnviron; i++) {
envUdfdWithPEnv[i] = (char *)taosMemoryMalloc(strlen(environ[i]) + 1);
strcpy(envUdfdWithPEnv[i], environ[i]);
}
for (int32_t i = 0; i < lenEnvUdfd; i++) {
if (envUdfd[i] != NULL) {
envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryMalloc(strlen(envUdfd[i]) + 1);
strcpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i]);
}
}
envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL;
options.env = envUdfdWithPEnv;
} else {
options.env = envUdfd;
}
int32_t err = uv_spawn(&pData->loop, &pData->process, &options);
pData->process.data = (void *)pData;
#ifdef WINDOWS
@ -202,7 +229,20 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) {
} else {
fnInfo("udfd is initialized");
}
if(taosFqdnEnvItem) taosMemoryFree(taosFqdnEnvItem);
if (taosFqdnEnvItem) {
taosMemoryFree(taosFqdnEnvItem);
}
if (envUdfdWithPEnv != NULL) {
int32_t i = 0;
while (envUdfdWithPEnv[i] != NULL) {
taosMemoryFree(envUdfdWithPEnv[i]);
i++;
}
taosMemoryFree(envUdfdWithPEnv);
}
return err;
}
@ -225,7 +265,7 @@ static void udfWatchUdfd(void *args) {
TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData));
atomic_store_32(&pData->spawnErr, 0);
(void)uv_barrier_wait(&pData->barrier);
int num = uv_run(&pData->loop, UV_RUN_DEFAULT);
int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT);
fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__);
uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
@ -458,7 +498,7 @@ typedef struct SUdfcUvSession {
typedef struct SClientUvTaskNode {
SUdfcProxy *udfc;
int8_t type;
int errCode;
int32_t errCode;
uv_pipe_t *pipe;
@ -927,7 +967,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
code = bdGetColumnInfoData(block, 0, &col);
TAOS_CHECK_GOTO(code, &lino, _exit);
for (int i = 0; i < udfCol->colData.numOfRows; ++i) {
for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) {
if (udfColDataIsNull(udfCol, i)) {
colDataSetNULL(col, i);
} else {
@ -971,13 +1011,13 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
if (input[i].numOfRows < numOfRows) {
int32_t startRow = input[i].numOfRows;
int expandRows = numOfRows - startRow;
int32_t expandRows = numOfRows - startRow;
bool isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1);
if (isNull) {
colDataSetNNULL(pDest, startRow, expandRows);
} else {
char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1);
for (int j = 0; j < expandRows; ++j) {
for (int32_t j = 0; j < expandRows; ++j) {
TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit);
}
}
@ -1026,8 +1066,8 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf);
void udfcUvHandleRsp(SClientUvConn *conn);
void udfcUvHandleError(SClientUvConn *conn);
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
void onUdfcPipeWrite(uv_write_t *write, int status);
void onUdfcPipeConnect(uv_connect_t *connect, int status);
void onUdfcPipeWrite(uv_write_t *write, int32_t status);
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status);
int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask);
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask);
@ -1037,7 +1077,7 @@ void udfStopAsyncCb(uv_async_t *async);
void constructUdfService(void *argsThread);
int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType);
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle);
int compareUdfcFuncSub(const void *elem1, const void *elem2);
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2);
int32_t doTeardownUdf(UdfcFuncHandle handle);
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
@ -1064,7 +1104,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
void cleanupNotExpiredUdfs();
void cleanupExpiredUdfs();
int compareUdfcFuncSub(const void *elem1, const void *elem2) {
int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
@ -1157,11 +1197,12 @@ void cleanupExpiredUdfs() {
while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle,
stub->refCount);
(void)doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle);
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p",
stub->udfName, stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
if (taosArrayPush(expiredUdfStubs, stub) == NULL) {
@ -1347,7 +1388,8 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
return code;
}
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
SUdfInterBuf state = {
.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
@ -1391,7 +1433,8 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
SUdfInterBuf state = {
.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
int32_t udfCallCode = 0;
udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
@ -1620,7 +1663,7 @@ void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
}
void onUdfcPipeWrite(uv_write_t *write, int status) {
void onUdfcPipeWrite(uv_write_t *write, int32_t status) {
SClientUvConn *conn = write->data;
if (status < 0) {
fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
@ -1631,7 +1674,7 @@ void onUdfcPipeWrite(uv_write_t *write, int status) {
taosMemoryFree(write);
}
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) {
SClientUvTaskNode *uvTask = connect->data;
if (status != 0) {
fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status));
@ -1683,8 +1726,7 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT
return terrno;
}
void *buf = bufBegin;
if(encodeUdfRequest(&buf, &request) <= 0)
{
if (encodeUdfRequest(&buf, &request) <= 0) {
fnError("udfc create uv task, encode request failed. size: %d", bufLen);
taosMemoryFree(bufBegin);
return TSDB_CODE_UDF_UV_EXEC_FAILURE;
@ -1695,8 +1737,7 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT
} else if (uvTaskType == UV_TASK_DISCONNECT) {
uvTask->pipe = task->session->udfUvPipe;
}
if (uv_sem_init(&uvTask->taskSem, 0) != 0)
{
if (uv_sem_init(&uvTask->taskSem, 0) != 0) {
if (uvTaskType == UV_TASK_REQ_RSP) {
taosMemoryFree(uvTask->reqBuf.base);
}
@ -1784,7 +1825,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
write->data = pipe->data;
QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue;
QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
if (err != 0) {
taosMemoryFree(write);
fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
@ -1891,7 +1932,7 @@ void constructUdfService(void *argsThread) {
QUEUE_INIT(&udfc->uvProcTaskQueue);
(void)uv_barrier_wait(&udfc->initBarrier);
// TODO return value of uv_run
int num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT);
fnInfo("udfc uv loop exit. active handle num: %d", num);
(void)uv_loop_close(&udfc->uvLoop);
@ -2180,7 +2221,7 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
}
int32_t doTeardownUdf(UdfcFuncHandle handle) {
int32_t code = TSDB_CODE_SUCCESS, lino = 0;;
int32_t code = TSDB_CODE_SUCCESS, lino = 0;
SUdfcUvSession *session = (SUdfcUvSession *)handle;
if (session->udfUvPipe == NULL) {