From 8b8778652253aa99139613cf284518dfcd817932 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 14 Apr 2022 14:26:54 +0800 Subject: [PATCH] fix(cluster): create qnode in multi-process mode may crash --- source/dnode/mgmt/implement/src/dmExec.c | 7 ++++++- source/dnode/mgmt/implement/src/dmHandle.c | 15 ++++++++------- source/dnode/mgmt/implement/src/dmObj.c | 4 +++- source/dnode/mgmt/interface/inc/dmDef.h | 2 +- source/libs/transport/src/transSrv.c | 12 ++++++------ 5 files changed, 24 insertions(+), 16 deletions(-) diff --git a/source/dnode/mgmt/implement/src/dmExec.c b/source/dnode/mgmt/implement/src/dmExec.c index bb18eb85f1..36e135b9de 100644 --- a/source/dnode/mgmt/implement/src/dmExec.c +++ b/source/dnode/mgmt/implement/src/dmExec.c @@ -275,19 +275,24 @@ static void dmProcessProcHandle(void *handle) { } static void dmWatchNodes(SDnode *pDnode) { + taosThreadMutexLock(&pDnode->mutex); if (pDnode->ptype == DND_PROC_PARENT) { for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; + if (pWrapper->procType != DND_PROC_PARENT) continue; if (pDnode->ntype == NODE_END) continue; if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) { dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); - taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle); + if (pWrapper->procObj) { + taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle); + } dmNewNodeProc(pWrapper, n); } } } + taosThreadMutexUnlock(&pDnode->mutex); } int32_t dmRun(SDnode *pDnode) { diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 8f9a316c72..93b24535a9 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -116,7 +116,7 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs return -1; } - taosWLockLatch(&pDnode->wrapperLock); + taosThreadMutexLock(&pDnode->mutex); pWrapper = &pDnode->wrappers[ntype]; if (taosMkDir(pWrapper->path) != 0) { @@ -132,10 +132,11 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs dDebug("node:%s, has been created", pWrapper->name); pWrapper->required = true; pWrapper->deployed = true; + pWrapper->procType = pDnode->ptype; (void)dmOpenNode(pWrapper); } - taosWUnLockLatch(&pDnode->wrapperLock); + taosThreadMutexUnlock(&pDnode->mutex); return code; } @@ -147,24 +148,24 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) return -1; } - taosWLockLatch(&pWrapper->latch); + taosThreadMutexLock(&pDnode->mutex); int32_t code = (*pWrapper->fp.dropFp)(pWrapper, pMsg); if (code != 0) { dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); } else { dDebug("node:%s, has been dropped", pWrapper->name); - pWrapper->required = false; - pWrapper->deployed = false; - taosRemoveDir(pWrapper->path); } - taosWUnLockLatch(&pWrapper->latch); dmReleaseWrapper(pWrapper); if (code == 0) { dmCloseNode(pWrapper); + pWrapper->required = false; + pWrapper->deployed = false; + taosRemoveDir(pWrapper->path); } + taosThreadMutexUnlock(&pDnode->mutex); return code; } diff --git a/source/dnode/mgmt/implement/src/dmObj.c b/source/dnode/mgmt/implement/src/dmObj.c index be89814d6a..66bfb27016 100644 --- a/source/dnode/mgmt/implement/src/dmObj.c +++ b/source/dnode/mgmt/implement/src/dmObj.c @@ -48,7 +48,7 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { } taosInitRWLatch(&pDnode->data.latch); - taosInitRWLatch(&pDnode->wrapperLock); + taosThreadMutexInit(&pDnode->mutex, NULL); return 0; } @@ -67,6 +67,8 @@ static void dmClearVars(SDnode *pDnode) { taosMemoryFreeClear(pDnode->data.firstEp); taosMemoryFreeClear(pDnode->data.secondEp); taosMemoryFreeClear(pDnode->data.dataDir); + taosThreadMutexDestroy(&pDnode->mutex); + memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); taosMemoryFree(pDnode); dDebug("dnode memory is cleared, data:%p", pDnode); } diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index c0c33570db..a38349f852 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -140,7 +140,7 @@ typedef struct SDnode { SStartupReq startup; SDnodeTrans trans; SDnodeData data; - SRWLatch wrapperLock; + TdThreadMutex mutex; SMgmtWrapper wrappers[NODE_END]; } SDnode; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 79c72b3a35..c5a74d4840 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -147,8 +147,8 @@ static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleR static void uvDestroyConn(uv_handle_t* handle); // server and worker thread -static void* workerThread(void* arg); -static void* acceptThread(void* arg); +static void* transWorkerThread(void* arg); +static void* transAcceptThread(void* arg); // add handle loop static bool addHandleToWorkloop(void* arg); @@ -538,7 +538,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { } } -void* acceptThread(void* arg) { +void* transAcceptThread(void* arg) { // opt setThreadName("trans-accept"); SServerObj* srv = (SServerObj*)arg; @@ -596,7 +596,7 @@ static bool addHandleToAcceptloop(void* arg) { } return true; } -void* workerThread(void* arg) { +void* transWorkerThread(void* arg) { setThreadName("trans-worker"); SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT); @@ -686,7 +686,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (false == addHandleToWorkloop(thrd)) { goto End; } - int err = taosThreadCreate(&(thrd->thread), NULL, workerThread, (void*)(thrd)); + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); if (err == 0) { tDebug("sucess to create worker-thread %d", i); // printf("thread %d create\n", i); @@ -698,7 +698,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (false == addHandleToAcceptloop(srv)) { goto End; } - int err = taosThreadCreate(&srv->thread, NULL, acceptThread, (void*)srv); + int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); if (err == 0) { tDebug("success to create accept-thread"); } else {