From c049678ade258de74d9b5b2cc6db2c9a5dc8fe4e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 27 Nov 2021 15:57:54 +0800 Subject: [PATCH] TD-11265 fix deadlock while quit taosd --- include/util/tworker.h | 19 +++++++++++-------- source/dnode/mgmt/daemon/src/daemon.c | 5 ++++- source/dnode/mgmt/impl/inc/dndInt.h | 22 +++++++++++----------- source/dnode/mgmt/impl/src/dndMnode.c | 12 ++++++++---- source/dnode/mgmt/impl/src/dndTransport.c | 8 +++++++- source/dnode/mgmt/impl/src/dnode.c | 2 +- source/util/src/tqueue.c | 2 +- source/util/src/tworker.c | 4 ++-- 8 files changed, 45 insertions(+), 29 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index 9b0fe4f3a5..2e5852cbba 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -22,10 +22,13 @@ extern "C" { #endif +typedef struct SWorkerPool SWorkerPool; +typedef struct SMWorkerPool SMWorkerPool; + typedef struct SWorker { - int32_t id; // worker ID - pthread_t thread; // thread - struct SWorkerPool *pool; + int32_t id; // worker ID + pthread_t thread; // thread + SWorkerPool *pool; } SWorker; typedef struct SWorkerPool { @@ -39,11 +42,11 @@ typedef struct SWorkerPool { } SWorkerPool; typedef struct SMWorker { - int32_t id; // worker id - pthread_t thread; // thread - taos_qall qall; - taos_qset qset; // queue set - struct SMWorkerPool *pool; + int32_t id; // worker id + pthread_t thread; // thread + taos_qall qall; + taos_qset qset; // queue set + SMWorkerPool *pool; } SMWorker; typedef struct SMWorkerPool { diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index effaec66a8..a0ca0dd390 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -30,7 +30,10 @@ static struct { char configDir[PATH_MAX]; } global = {0}; -void dmnSigintHandle(int signum, void *info, void *ctx) { global.stop = true; } +void dmnSigintHandle(int signum, void *info, void *ctx) { + uError("singal:%d is received", signum); + global.stop = true; +} void dmnSetSignalHandle() { taosSetSignal(SIGTERM, dmnSigintHandle); diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 106f192856..39243a1795 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -74,31 +74,31 @@ typedef struct { int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; - SWorkerPool mgmtPool; - SWorkerPool readPool; - SWorkerPool writePool; - SWorkerPool syncPool; + char *file; + SMnode *pMnode; + SRWLatch latch; taos_queue pReadQ; taos_queue pWriteQ; taos_queue pApplyQ; taos_queue pSyncQ; taos_queue pMgmtQ; - char *file; - SMnode *pMnode; - SRWLatch latch; + SWorkerPool mgmtPool; + SWorkerPool readPool; + SWorkerPool writePool; + SWorkerPool syncPool; } SMnodeMgmt; typedef struct { SHashObj *hash; + int32_t openVnodes; + int32_t totalVnodes; + SRWLatch latch; + taos_queue pMgmtQ; SWorkerPool mgmtPool; SWorkerPool queryPool; SWorkerPool fetchPool; SMWorkerPool syncPool; SMWorkerPool writePool; - taos_queue pMgmtQ; - int32_t openVnodes; - int32_t totalVnodes; - SRWLatch latch; } SVnodesMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index a4f6d845fd..fe3accdd84 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -294,14 +294,14 @@ static void dndStopMnodeWorker(SDnode *pDnode) { while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10); + dndCleanupMnodeReadWorker(pDnode); + dndCleanupMnodeWriteWorker(pDnode); + dndCleanupMnodeSyncWorker(pDnode); + dndFreeMnodeReadQueue(pDnode); dndFreeMnodeWriteQueue(pDnode); dndFreeMnodeApplyQueue(pDnode); dndFreeMnodeSyncQueue(pDnode); - - dndCleanupMnodeReadWorker(pDnode); - dndCleanupMnodeWriteWorker(pDnode); - dndCleanupMnodeSyncWorker(pDnode); } static bool dndNeedDeployMnode(SDnode *pDnode) { @@ -714,6 +714,7 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->mgmtPool); + dDebug("mnode mgmt worker is stopped"); } static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { @@ -750,6 +751,7 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { static void dndCleanupMnodeReadWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->readPool); + dDebug("mnode read worker is stopped"); } static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { @@ -803,6 +805,7 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { static void dndCleanupMnodeWriteWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->writePool); + dDebug("mnode write worker is stopped"); } static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { @@ -839,6 +842,7 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; tWorkerCleanup(&pMgmt->syncPool); + dDebug("mnode sync worker is stopped"); } int32_t dndInitMnode(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index d5f52bac8b..c3940cd3cc 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -160,6 +160,7 @@ static int32_t dndInitClient(SDnode *pDnode) { rpcInit.user = INTERNAL_USER; rpcInit.ckey = INTERNAL_CKEY; rpcInit.secret = INTERNAL_SECRET; + rpcInit.parent = pDnode; pMgmt->clientRpc = rpcOpen(&rpcInit); if (pMgmt->clientRpc == NULL) { @@ -167,6 +168,7 @@ static int32_t dndInitClient(SDnode *pDnode) { return -1; } + dDebug("dnode rpc client is initialized"); return 0; } @@ -175,7 +177,7 @@ static void dndCleanupClient(SDnode *pDnode) { if (pMgmt->clientRpc) { rpcClose(pMgmt->clientRpc); pMgmt->clientRpc = NULL; - dInfo("dnode peer rpc client is closed"); + dDebug("dnode rpc client is closed"); } } @@ -315,6 +317,7 @@ static int32_t dndInitServer(SDnode *pDnode) { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000; rpcInit.afp = dndRetrieveUserAuthInfo; + rpcInit.parent = pDnode; pMgmt->serverRpc = rpcOpen(&rpcInit); if (pMgmt->serverRpc == NULL) { @@ -322,6 +325,7 @@ static int32_t dndInitServer(SDnode *pDnode) { return -1; } + dDebug("dnode rpc server is initialized"); return 0; } @@ -330,6 +334,7 @@ static void dndCleanupServer(SDnode *pDnode) { if (pMgmt->serverRpc) { rpcClose(pMgmt->serverRpc); pMgmt->serverRpc = NULL; + dDebug("dnode rpc server is closed"); } } @@ -347,6 +352,7 @@ int32_t dndInitTrans(SDnode *pDnode) { } void dndCleanupTrans(SDnode *pDnode) { + dInfo("dnode-transport start to clean up"); dndCleanupServer(pDnode); dndCleanupClient(pDnode); dInfo("dnode-transport is cleaned up"); diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 8d72f83200..23c9ee0ebf 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -197,7 +197,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { dndReportStartup(pDnode, "TDengine", "initialized successfully"); dInfo("TDengine is initialized successfully"); - return 0; + return pDnode; } void dndCleanup(SDnode *pDnode) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 93008f7114..04bc0c8dc8 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -107,7 +107,7 @@ bool taosQueueEmpty(taos_queue param) { if (queue->head == NULL && queue->tail == NULL) { empty = true; } - pthread_mutex_destroy(&queue->mutex); + pthread_mutex_unlock(&queue->mutex); return empty; } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 136bc40482..11972e84cb 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -50,7 +50,7 @@ void tWorkerCleanup(SWorkerPool *pool) { } } - free(pool->workers); + tfree(pool->workers); taosCloseQset(pool->qset); pthread_mutex_destroy(&pool->mutex); @@ -159,7 +159,7 @@ void tMWorkerCleanup(SMWorkerPool *pool) { } } - free(pool->workers); + tfree(pool->workers); pthread_mutex_destroy(&pool->mutex); uInfo("worker:%s is closed", pool->name);