feature(udf): pipe is placed under datadir and remove it after udfd exist and is hidden
This commit is contained in:
parent
6916f4696a
commit
736fddc4cd
|
@ -127,7 +127,7 @@ enum {
|
||||||
|
|
||||||
int64_t gUdfTaskSeqNum = 0;
|
int64_t gUdfTaskSeqNum = 0;
|
||||||
typedef struct SUdfdProxy {
|
typedef struct SUdfdProxy {
|
||||||
char udfdPipeName[UDF_LISTEN_PIPE_NAME_LEN];
|
char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
|
||||||
uv_barrier_t gUdfInitBarrier;
|
uv_barrier_t gUdfInitBarrier;
|
||||||
|
|
||||||
uv_loop_t gUdfdLoop;
|
uv_loop_t gUdfdLoop;
|
||||||
|
@ -224,9 +224,15 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) {
|
||||||
size_t dnodeIdSize = sizeof(dnodeId);
|
size_t dnodeIdSize = sizeof(dnodeId);
|
||||||
int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
|
int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
|
fnError("get dnode id from env. error: %s.", uv_err_name(err));
|
||||||
dnodeId[0] = '1';
|
dnodeId[0] = '1';
|
||||||
}
|
}
|
||||||
|
#ifdef _WIN32
|
||||||
snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
|
snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
|
||||||
|
#else
|
||||||
|
snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
|
||||||
|
#endif
|
||||||
|
fnInfo("get dnode id from env. dnode id: %s. pipe path: %s", dnodeId, pipeName);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -998,7 +1004,7 @@ int32_t udfcOpen() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
SUdfdProxy *proxy = &gUdfdProxy;
|
SUdfdProxy *proxy = &gUdfdProxy;
|
||||||
getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN);
|
getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
|
||||||
proxy->gUdfcState = UDFC_STATE_STARTNG;
|
proxy->gUdfcState = UDFC_STATE_STARTNG;
|
||||||
uv_barrier_init(&proxy->gUdfInitBarrier, 2);
|
uv_barrier_init(&proxy->gUdfInitBarrier, 2);
|
||||||
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
|
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
|
||||||
|
|
|
@ -30,7 +30,7 @@ typedef struct SUdfdContext {
|
||||||
uv_loop_t *loop;
|
uv_loop_t *loop;
|
||||||
uv_pipe_t ctrlPipe;
|
uv_pipe_t ctrlPipe;
|
||||||
uv_signal_t intrSignal;
|
uv_signal_t intrSignal;
|
||||||
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
|
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
|
||||||
uv_pipe_t listeningPipe;
|
uv_pipe_t listeningPipe;
|
||||||
|
|
||||||
void *clientRpc;
|
void *clientRpc;
|
||||||
|
@ -652,7 +652,7 @@ static int32_t udfdUvInit() {
|
||||||
uv_pipe_open(&global.ctrlPipe, 0);
|
uv_pipe_open(&global.ctrlPipe, 0);
|
||||||
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
|
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
|
||||||
|
|
||||||
getUdfdPipeName(global.listenPipeName, UDF_LISTEN_PIPE_NAME_LEN);
|
getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
|
||||||
|
|
||||||
removeListeningPipe();
|
removeListeningPipe();
|
||||||
|
|
||||||
|
@ -696,6 +696,7 @@ static int32_t udfdRun() {
|
||||||
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
|
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
|
||||||
int codeClose = uv_loop_close(global.loop);
|
int codeClose = uv_loop_close(global.loop);
|
||||||
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
|
fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
|
||||||
|
removeListeningPipe();
|
||||||
udfdCloseClientRpc();
|
udfdCloseClientRpc();
|
||||||
uv_mutex_destroy(&global.udfsMutex);
|
uv_mutex_destroy(&global.udfsMutex);
|
||||||
taosHashCleanup(global.udfsHash);
|
taosHashCleanup(global.udfsHash);
|
||||||
|
|
Loading…
Reference in New Issue