fix coredump due to dmStopUdfd called while dmStartUdfd not called

This commit is contained in:
slzhou 2022-04-21 15:00:49 +08:00
parent c11edd523d
commit 81a2f3b78c
2 changed files with 54 additions and 37 deletions

View File

@ -216,20 +216,21 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread(pWrapper->pDnode); dmStopStatusThread(pWrapper->pDnode);
} }
static int32_t dmSpawnUdfd(SDnodeData *pData); static int32_t dmSpawnUdfd(SDnode *pDnode);
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
uv_close((uv_handle_t*)process, NULL); uv_close((uv_handle_t*)process, NULL);
SDnodeData *pData = process->data; SDnode *pDnode = process->data;
if (atomic_load_8(&pData->udfdStoping) != 0) { SUdfdData *pData = &pDnode->udfdData;
if (atomic_load_8(&pData->stopping) != 0) {
dDebug("udfd process exit due to stopping"); dDebug("udfd process exit due to stopping");
} else { } else {
dmSpawnUdfd(pData); dmSpawnUdfd(pDnode);
} }
} }
static int32_t dmSpawnUdfd(SDnodeData *pData) { static int32_t dmSpawnUdfd(SDnode *pDnode) {
dInfo("dnode start spawning udfd"); dInfo("dnode start spawning udfd");
uv_process_options_t options = {0}; uv_process_options_t options = {0};
@ -251,16 +252,17 @@ static int32_t dmSpawnUdfd(SDnodeData *pData) {
char dnodeIdEnvItem[32] = {0}; char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId);
SUdfdData *pData = &pDnode->udfdData;
float numCpuCores = 4; float numCpuCores = 4;
taosGetCpuCores(&numCpuCores); taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
options.env = envUdfd; options.env = envUdfd;
int err = uv_spawn(&pData->udfdLoop, &pData->udfdProcess, &options); int err = uv_spawn(&pData->loop, &pData->process, &options);
pData->udfdProcess.data = (void*)pData; pData->process.data = (void*)pDnode;
if (err != 0) { if (err != 0) {
dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
@ -269,40 +271,50 @@ static int32_t dmSpawnUdfd(SDnodeData *pData) {
} }
void dmWatchUdfd(void *args) { void dmWatchUdfd(void *args) {
SDnodeData *pData = args; SDnode *pDnode = args;
uv_loop_init(&pData->udfdLoop); SUdfdData *pData = &pDnode->udfdData;
int err = dmSpawnUdfd(pData); uv_loop_init(&pData->loop);
pData->udfdErrCode = err; int32_t err = dmSpawnUdfd(pDnode);
uv_barrier_wait(&pData->udfdBarrier); atomic_store_32(&pData->spawnErr, err);
if (pData->udfdErrCode == 0) { uv_barrier_wait(&pData->barrier);
uv_run(&pData->udfdLoop, UV_RUN_DEFAULT); if (pData->spawnErr == 0) {
uv_run(&pData->loop, UV_RUN_DEFAULT);
} }
uv_loop_close(&pData->udfdLoop); uv_loop_close(&pData->loop);
return; return;
} }
int32_t dmStartUdfd(SDnode *pDnode) { int32_t dmStartUdfd(SDnode *pDnode) {
SDnodeData *pData = &pDnode->data; SUdfdData *pData = &pDnode->udfdData;
uv_barrier_init(&pData->udfdBarrier, 2); if (pData->startCalled) {
pData->udfdStoping = 0; dInfo("dnode-mgmt start udfd already called");
uv_thread_create(&pData->udfdThread, dmWatchUdfd, pData); return 0;
uv_barrier_wait(&pData->udfdBarrier); }
return pData->udfdErrCode; uv_barrier_init(&pData->barrier, 2);
pData->stopping = 0;
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
uv_barrier_wait(&pData->barrier);
pData->startCalled = true;
pData->needCleanUp = true;
return pData->spawnErr;
} }
int32_t dmStopUdfd(SDnode *pDnode) { int32_t dmStopUdfd(SDnode *pDnode) {
dInfo("dnode-mgmt to stop udfd. spawn err: %d", pDnode->data.udfdErrCode); dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
SDnodeData *pData = &pDnode->data; pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr);
if (pData->udfdErrCode != 0) { SUdfdData *pData = &pDnode->udfdData;
if (!pData->needCleanUp) {
return 0; return 0;
} }
atomic_store_8(&pData->udfdStoping, 1); atomic_store_8(&pData->stopping, 1);
uv_barrier_destroy(&pData->udfdBarrier); uv_barrier_destroy(&pData->barrier);
uv_process_kill(&pData->udfdProcess, SIGINT); if (pData->spawnErr == 0) {
uv_thread_join(&pData->udfdThread); uv_process_kill(&pData->process, SIGINT);
}
uv_thread_join(&pData->thread);
atomic_store_8(&pData->udfdStoping, 0); atomic_store_8(&pData->stopping, 0);
return 0; return 0;
} }

View File

@ -136,13 +136,6 @@ typedef struct {
int32_t numOfDisks; int32_t numOfDisks;
int32_t supportVnodes; int32_t supportVnodes;
uint16_t serverPort; uint16_t serverPort;
uv_loop_t udfdLoop;
uv_thread_t udfdThread;
uv_barrier_t udfdBarrier;
uv_process_t udfdProcess;
int udfdErrCode;
int8_t udfdStoping;
} SDnodeData; } SDnodeData;
typedef struct { typedef struct {
@ -150,6 +143,17 @@ typedef struct {
char desc[TSDB_STEP_DESC_LEN]; char desc[TSDB_STEP_DESC_LEN];
} SStartupInfo; } SStartupInfo;
typedef struct SUdfdData {
bool startCalled;
bool needCleanUp;
uv_loop_t loop;
uv_thread_t thread;
uv_barrier_t barrier;
uv_process_t process;
int spawnErr;
int8_t stopping;
} SUdfdData;
typedef struct SDnode { typedef struct SDnode {
EDndProcType ptype; EDndProcType ptype;
EDndNodeType ntype; EDndNodeType ntype;
@ -158,6 +162,7 @@ typedef struct SDnode {
SStartupInfo startup; SStartupInfo startup;
SDnodeTrans trans; SDnodeTrans trans;
SDnodeData data; SDnodeData data;
SUdfdData udfdData;
TdThreadMutex mutex; TdThreadMutex mutex;
SMgmtWrapper wrappers[NODE_END]; SMgmtWrapper wrappers[NODE_END];
} SDnode; } SDnode;