From 568c6269dcbb57483f47a400d576b3934d76d7f8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 14:59:55 +0800 Subject: [PATCH 1/4] fix: acquire vnode on restart may deadlock --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 8c3b8576a8..41c0b3086b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -334,19 +334,23 @@ static int32_t vmRequire(const SMgmtInputOpt *pInput, bool *required) { } static int32_t vmStart(SVnodeMgmt *pMgmt) { - taosRLockLatch(&pMgmt->latch); + int32_t numOfVnodes = 0; + SVnodeObj **pVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes); - void *pIter = taosHashIterate(pMgmt->hash, NULL); - while (pIter) { - SVnodeObj **ppVnode = pIter; - if (ppVnode == NULL || *ppVnode == NULL) continue; - - SVnodeObj *pVnode = *ppVnode; + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; vnodeStart(pVnode->pImpl); - pIter = taosHashIterate(pMgmt->hash, pIter); } - taosRUnLockLatch(&pMgmt->latch); + for (int32_t i = 0; i < numOfVnodes; ++i) { + SVnodeObj *pVnode = pVnodes[i]; + vmReleaseVnode(pMgmt, pVnode); + } + + if (pVnodes != NULL) { + taosMemoryFree(pVnodes); + } + return 0; } From c5d18b5afd0af45ac0cad023f3e3c496a55c86ac Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 15:08:32 +0800 Subject: [PATCH 2/4] refactor: changel lockfree to rwlock --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 30 ++++++++++----------- source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 4 +-- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 +-- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 19 ++++++------- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 7fc10c4237..022d6204ef 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -26,21 +26,21 @@ extern "C" { #endif typedef struct SVnodeMgmt { - SDnodeData *pData; - SMsgCb msgCb; - const char *path; - const char *name; - SQWorkerPool queryPool; - SQWorkerPool fetchPool; - SWWorkerPool syncPool; - SWWorkerPool writePool; - SWWorkerPool mergePool; - SSingleWorker mgmtWorker; - SSingleWorker monitorWorker; - SHashObj *hash; - SRWLatch latch; - SVnodesStat state; - STfs *pTfs; + SDnodeData *pData; + SMsgCb msgCb; + const char *path; + const char *name; + SQWorkerPool queryPool; + SQWorkerPool fetchPool; + SWWorkerPool syncPool; + SWWorkerPool writePool; + SWWorkerPool mergePool; + SSingleWorker mgmtWorker; + SSingleWorker monitorWorker; + SHashObj *hash; + TdThreadRwlock lock; + SVnodesStat state; + STfs *pTfs; } SVnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 7a6c5f982e..cf5a7ad885 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -17,7 +17,7 @@ #include "vmInt.h" SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); int32_t num = 0; int32_t size = taosHashGetSize(pMgmt->hash); @@ -38,7 +38,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { } } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); *numOfVnodes = num; return pVnodes; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a4da6d089c..602922feeb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -20,7 +20,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); if (pInfo->pVloads == NULL) return; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { @@ -34,7 +34,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pIter = taosHashIterate(pMgmt->hash, pIter); } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); } void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 41c0b3086b..0c8d492ef4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -20,14 +20,14 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; } else { refCount = atomic_add_fetch_32(&pVnode->refCount, 1); } - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); if (pVnode != NULL) { dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); @@ -39,9 +39,9 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode == NULL) return; - taosRLockLatch(&pMgmt->latch); + taosThreadRwlockRdlock(&pMgmt->lock); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - taosRUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } @@ -70,9 +70,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - taosWLockLatch(&pMgmt->latch); + taosThreadRwlockWrlock(&pMgmt->lock); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); - taosWUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); return code; } @@ -80,9 +80,9 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { char path[TSDB_FILENAME_LEN] = {0}; - taosWLockLatch(&pMgmt->latch); + taosThreadRwlockWrlock(&pMgmt->lock); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); - taosWUnLockLatch(&pMgmt->latch); + taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); while (pVnode->refCount > 0) taosMsleep(10); @@ -239,6 +239,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { vmStopWorker(pMgmt); vnodeCleanup(); tfsClose(pMgmt->pTfs); + taosThreadRwlockDestroy(&pMgmt->lock); taosMemoryFree(pMgmt); } @@ -260,7 +261,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.mgmt = pMgmt; - taosInitRWLatch(&pMgmt->latch); + taosThreadRwlockInit(&pMgmt->lock, NULL); SDiskCfg dCfg = {0}; tstrncpy(dCfg.dir, tsDataDir, TSDB_FILENAME_LEN); From 380780331da64917d7e9754214ca485cd8343c9f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 15:19:19 +0800 Subject: [PATCH 3/4] refactor: change lockfree to rwlock --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 8 +++---- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 1 - source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 2 +- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 22 +++++++++--------- source/dnode/mgmt/node_mgmt/src/dmNodes.c | 4 ++-- source/dnode/mgmt/node_util/inc/dmUtil.h | 24 ++++++++++---------- source/dnode/mgmt/node_util/src/dmEps.c | 25 ++++++++++----------- 7 files changed, 43 insertions(+), 43 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 7c1162ec10..f7337f482f 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -19,11 +19,11 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) { dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); - taosWLockLatch(&pMgmt->pData->latch); + taosThreadRwlockWrlock(&pMgmt->pData->lock); pMgmt->pData->dnodeId = pCfg->dnodeId; pMgmt->pData->clusterId = pCfg->clusterId; dmWriteEps(pMgmt->pData); - taosWUnLockLatch(&pMgmt->pData->latch); + taosThreadRwlockUnlock(&pMgmt->pData->lock); } } @@ -50,7 +50,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { void dmSendStatusReq(SDnodeMgmt *pMgmt) { SStatusReq req = {0}; - taosRLockLatch(&pMgmt->pData->latch); + taosThreadRwlockRdlock(&pMgmt->pData->lock); req.sver = tsVersion; req.dnodeVer = pMgmt->pData->dnodeVer; req.dnodeId = pMgmt->pData->dnodeId; @@ -69,7 +69,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); - taosRUnLockLatch(&pMgmt->pData->latch); + taosThreadRwlockUnlock(&pMgmt->pData->lock); SMonVloadInfo vinfo = {0}; (*pMgmt->getVnodeLoadsFp)(&vinfo); diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 6d0bea9590..68b6ef659e 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -30,7 +30,6 @@ typedef struct SSnodeMgmt { SMsgCb msgCb; const char *path; const char *name; - SRWLatch latch; int8_t uniqueWorkerInUse; SArray *uniqueWorkers; // SArray SSingleWorker sharedWorker; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 9d092a93bc..5818b58801 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -70,7 +70,7 @@ typedef struct SMgmtWrapper { const char *name; char *path; int32_t refCount; - SRWLatch latch; + TdThreadRwlock lock; EDndNodeType ntype; bool deployed; bool required; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 3cbb9ff046..96285bbe1c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -91,7 +91,7 @@ static int32_t dmInitVars(SDnode *pDnode, EDndNodeType rtype) { return -1; } - taosInitRWLatch(&pData->latch); + taosThreadRwlockInit(&pData->lock, NULL); taosThreadMutexInit(&pDnode->mutex, NULL); return 0; } @@ -100,6 +100,7 @@ static void dmClearVars(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; taosMemoryFreeClear(pWrapper->path); + taosThreadRwlockDestroy(&pWrapper->lock); } if (pDnode->lockfile != NULL) { taosUnLockFile(pDnode->lockfile); @@ -108,7 +109,7 @@ static void dmClearVars(SDnode *pDnode) { } SDnodeData *pData = &pDnode->data; - taosWLockLatch(&pData->latch); + taosThreadRwlockWrlock(&pData->lock); if (pData->dnodeEps != NULL) { taosArrayDestroy(pData->dnodeEps); pData->dnodeEps = NULL; @@ -117,8 +118,9 @@ static void dmClearVars(SDnode *pDnode) { taosHashCleanup(pData->dnodeHash); pData->dnodeHash = NULL; } - taosWUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); + taosThreadRwlockDestroy(&pData->lock); taosThreadMutexDestroy(&pDnode->mutex); memset(&pDnode->mutex, 0, sizeof(pDnode->mutex)); } @@ -151,7 +153,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) { if (ntype == DNODE) { pWrapper->proc.ptype = DND_PROC_SINGLE; } - taosInitRWLatch(&pWrapper->latch); + taosThreadRwlockInit(&pWrapper->lock, NULL); snprintf(path, sizeof(path), "%s%s%s", tsDataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); @@ -223,7 +225,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pRetWrapper = pWrapper; - taosRLockLatch(&pWrapper->latch); + taosThreadRwlockRdlock(&pWrapper->lock); if (pWrapper->deployed) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is acquired, ref:%d", pWrapper->name, refCount); @@ -231,7 +233,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; pRetWrapper = NULL; } - taosRUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); return pRetWrapper; } @@ -239,7 +241,7 @@ SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { int32_t code = 0; - taosRLockLatch(&pWrapper->latch); + taosThreadRwlockRdlock(&pWrapper->lock); if (pWrapper->deployed || (InParentProc(pWrapper) && pWrapper->required)) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is marked, ref:%d", pWrapper->name, refCount); @@ -247,7 +249,7 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { terrno = TSDB_CODE_NODE_NOT_DEPLOYED; code = -1; } - taosRUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); return code; } @@ -255,9 +257,9 @@ int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { void dmReleaseWrapper(SMgmtWrapper *pWrapper) { if (pWrapper == NULL) return; - taosRLockLatch(&pWrapper->latch); + taosThreadRwlockRdlock(&pWrapper->lock); int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); - taosRUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); dTrace("node:%s, is released, ref:%d", pWrapper->name, refCount); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index ecfa37725a..ab9d3f67e7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -186,12 +186,12 @@ void dmCloseNode(SMgmtWrapper *pWrapper) { } } - taosWLockLatch(&pWrapper->latch); + taosThreadRwlockWrlock(&pWrapper->lock); if (pWrapper->pMgmt != NULL) { (*pWrapper->func.closeFp)(pWrapper->pMgmt); pWrapper->pMgmt = NULL; } - taosWUnLockLatch(&pWrapper->latch); + taosThreadRwlockUnlock(&pWrapper->lock); if (!OnlyInSingleProc(pWrapper)) { dmCleanupProc(pWrapper); diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index b0e764bf8e..e7256a3a87 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -94,18 +94,18 @@ typedef void (*GetVnodeLoadsFp)(); typedef void (*GetMnodeLoadsFp)(); typedef struct { - int32_t dnodeId; - int64_t clusterId; - int64_t dnodeVer; - int64_t updateTime; - int64_t rebootTime; - bool dropped; - bool stopped; - SEpSet mnodeEps; - SArray *dnodeEps; - SHashObj *dnodeHash; - SRWLatch latch; - SMsgCb msgCb; + int32_t dnodeId; + int64_t clusterId; + int64_t dnodeVer; + int64_t updateTime; + int64_t rebootTime; + bool dropped; + bool stopped; + SEpSet mnodeEps; + SArray *dnodeEps; + SHashObj *dnodeHash; + TdThreadRwlock lock; + SMsgCb msgCb; } SDnodeData; typedef struct { diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index a6c9fda64d..94fa569557 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -21,7 +21,7 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep); static void dmResetEps(SDnodeData *pData, SArray *dnodeEps); static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - taosRLockLatch(&pData->latch); + taosThreadRwlockRdlock(&pData->lock); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { @@ -36,7 +36,7 @@ static void dmGetDnodeEp(SDnodeData *pData, int32_t dnodeId, char *pEp, char *pF } } - taosRUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); } int32_t dmReadEps(SDnodeData *pData) { @@ -232,7 +232,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) { int32_t numOfEps = taosArrayGetSize(eps); if (numOfEps <= 0) return; - taosWLockLatch(&pData->latch); + taosThreadRwlockWrlock(&pData->lock); int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps); if (numOfEps != numOfEpsOld) { @@ -246,7 +246,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *eps) { } } - taosWUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); } static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) { @@ -292,7 +292,7 @@ static void dmPrintEps(SDnodeData *pData) { static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { bool changed = false; if (dnodeId == 0) return changed; - taosRLockLatch(&pData->latch); + taosThreadRwlockRdlock(&pData->lock); SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp != NULL) { @@ -304,24 +304,23 @@ static bool dmIsEpChanged(SDnodeData *pData, int32_t dnodeId, const char *ep) { } } - taosRUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); return changed; } void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { - taosRLockLatch(&pData->latch); + taosThreadRwlockRdlock(&pData->lock); *pEpSet = pData->mnodeEps; - taosRUnLockLatch(&pData->latch); + taosThreadRwlockUnlock(&pData->lock); } void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); - - taosWLockLatch(&pData->latch); + taosThreadRwlockWrlock(&pData->lock); pData->mnodeEps = *pEpSet; + taosThreadRwlockUnlock(&pData->lock); + + dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } - - taosWUnLockLatch(&pData->latch); } From 2ee38b94fd530421490925dc0b1552ef5893cabf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 15:52:35 +0800 Subject: [PATCH 4/4] refactor: change lockfree to rwlock --- include/dnode/mnode/sdb/sdb.h | 34 ++++++------ source/dnode/mnode/sdb/src/sdb.c | 6 ++- source/dnode/mnode/sdb/src/sdbFile.c | 6 +-- source/dnode/mnode/sdb/src/sdbHash.c | 80 ++++++++++++++-------------- 4 files changed, 64 insertions(+), 62 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index a56c6ca16d..2abe0e5c73 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -333,23 +333,23 @@ SSdbRow *sdbAllocRow(int32_t objSize); void *sdbGetRowObj(SSdbRow *pRow); typedef struct SSdb { - SMnode *pMnode; - char *currDir; - char *syncDir; - char *tmpDir; - int64_t lastCommitVer; - int64_t curVer; - int64_t tableVer[SDB_MAX]; - int64_t maxId[SDB_MAX]; - EKeyType keyTypes[SDB_MAX]; - SHashObj *hashObjs[SDB_MAX]; - SRWLatch locks[SDB_MAX]; - SdbInsertFp insertFps[SDB_MAX]; - SdbUpdateFp updateFps[SDB_MAX]; - SdbDeleteFp deleteFps[SDB_MAX]; - SdbDeployFp deployFps[SDB_MAX]; - SdbEncodeFp encodeFps[SDB_MAX]; - SdbDecodeFp decodeFps[SDB_MAX]; + SMnode *pMnode; + char *currDir; + char *syncDir; + char *tmpDir; + int64_t lastCommitVer; + int64_t curVer; + int64_t tableVer[SDB_MAX]; + int64_t maxId[SDB_MAX]; + EKeyType keyTypes[SDB_MAX]; + SHashObj *hashObjs[SDB_MAX]; + TdThreadRwlock locks[SDB_MAX]; + SdbInsertFp insertFps[SDB_MAX]; + SdbUpdateFp updateFps[SDB_MAX]; + SdbDeleteFp deleteFps[SDB_MAX]; + SdbDeployFp deployFps[SDB_MAX]; + SdbEncodeFp encodeFps[SDB_MAX]; + SdbDecodeFp decodeFps[SDB_MAX]; } SSdb; #ifdef __cplusplus diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 51f40c12cd..1f11a77e6c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -48,7 +48,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { } for (ESdbType i = 0; i < SDB_MAX; ++i) { - taosInitRWLatch(&pSdb->locks[i]); + taosThreadRwlockInit(&pSdb->locks[i], NULL); pSdb->maxId[i] = 0; pSdb->tableVer[i] = 0; pSdb->keyTypes[i] = SDB_KEY_INT32; @@ -98,7 +98,10 @@ void sdbCleanup(SSdb *pSdb) { taosHashClear(hash); taosHashCleanup(hash); + taosThreadRwlockDestroy(&pSdb->locks[i]); pSdb->hashObjs[i] = NULL; + memset(&pSdb->locks[i], 0, sizeof(pSdb->locks[i])); + mDebug("sdb table:%s is cleaned up", sdbTableName(i)); } @@ -134,7 +137,6 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->maxId[sdbType] = 0; pSdb->hashObjs[sdbType] = hash; - taosInitRWLatch(&pSdb->locks[sdbType]); mDebug("sdb table:%s is initialized", sdbTableName(sdbType)); return 0; diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index e9037a7b11..ad1429f667 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -257,8 +257,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); SHashObj *hash = pSdb->hashObjs[i]; - SRWLatch *pLock = &pSdb->locks[i]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[i]; + taosThreadRwlockWrlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -303,7 +303,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { sdbFreeRaw(pRaw); ppRow = taosHashIterate(hash, ppRow); } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } if (code == 0) { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 94008b2f7c..a25c7a5233 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -129,12 +129,12 @@ static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) { } static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); SSdbRow *pOldRow = taosHashGet(hash, pRow->pObj, keySize); if (pOldRow != NULL) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_ALREADY_THERE; return terrno; @@ -145,13 +145,13 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * sdbPrintOper(pSdb, pRow, "insert"); if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); int32_t code = 0; SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; @@ -159,9 +159,9 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * code = (*insertFp)(pSdb, pRow->pObj); if (code != 0) { code = terrno; - taosWLockLatch(pLock); + taosThreadRwlockWrlock(pLock); taosHashRemove(hash, pRow->pObj, keySize); - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = code; return terrno; @@ -180,19 +180,19 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pNewRow, int32_t keySize) { - SRWLatch *pLock = &pSdb->locks[pNewRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pNewRow->type]; + taosThreadRwlockWrlock(pLock); SSdbRow **ppOldRow = taosHashGet(hash, pNewRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return sdbInsertRow(pSdb, hash, pRaw, pNewRow, keySize); } SSdbRow *pOldRow = *ppOldRow; pOldRow->status = pRaw->status; sdbPrintOper(pSdb, pOldRow, "update"); - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); int32_t code = 0; SdbUpdateFp updateFp = pSdb->updateFps[pNewRow->type]; @@ -207,12 +207,12 @@ static int32_t sdbUpdateRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * } static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *pRow, int32_t keySize) { - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); SSdbRow **ppOldRow = taosHashGet(hash, pRow->pObj, keySize); if (ppOldRow == NULL || *ppOldRow == NULL) { - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); sdbFreeRow(pSdb, pRow, false); terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; return terrno; @@ -223,7 +223,7 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * sdbPrintOper(pSdb, pOldRow, "delete"); taosHashRemove(hash, pOldRow->pObj, keySize); - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); pSdb->tableVer[pOldRow->type]++; sdbFreeRow(pSdb, pRow, false); @@ -278,12 +278,12 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { void *pRet = NULL; int32_t keySize = sdbGetkeySize(pSdb, type, pKey); - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashGet(hash, pKey, keySize); if (ppRow == NULL || *ppRow == NULL) { - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); terrno = TSDB_CODE_SDB_OBJ_NOT_THERE; return NULL; } @@ -306,13 +306,13 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { break; } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return pRet; } static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); int32_t ref = atomic_load_32(&pRow->refCount); sdbPrintOper(pSdb, pRow, "check"); @@ -320,7 +320,7 @@ static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) { sdbFreeRow(pSdb, pRow, true); } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } void sdbRelease(SSdb *pSdb, void *pObj) { @@ -329,8 +329,8 @@ void sdbRelease(SSdb *pSdb, void *pObj) { SSdbRow *pRow = (SSdbRow *)((char *)pObj - sizeof(SSdbRow)); if (pRow->type >= SDB_MAX) return; - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosWLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockWrlock(pLock); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); sdbPrintOper(pSdb, pRow, "release"); @@ -338,7 +338,7 @@ void sdbRelease(SSdb *pSdb, void *pObj) { sdbFreeRow(pSdb, pRow, true); } - taosWUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { @@ -347,8 +347,8 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return NULL; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, pIter); while (ppRow != NULL) { @@ -363,7 +363,7 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj) { *ppObj = pRow->pObj; break; } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return ppRow; } @@ -374,18 +374,18 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { SHashObj *hash = sdbGetHash(pSdb, pRow->type); if (hash == NULL) return; - SRWLatch *pLock = &pSdb->locks[pRow->type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[pRow->type]; + taosThreadRwlockRdlock(pLock); taosHashCancelIterate(hash, pIter); - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -401,17 +401,17 @@ void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2 ppRow = taosHashIterate(hash, ppRow); } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); } int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return 0; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); int32_t size = taosHashGetSize(hash); - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); return size; } @@ -424,8 +424,8 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { int32_t maxId = 0; - SRWLatch *pLock = &pSdb->locks[type]; - taosRLockLatch(pLock); + TdThreadRwlock *pLock = &pSdb->locks[type]; + taosThreadRwlockRdlock(pLock); SSdbRow **ppRow = taosHashIterate(hash, NULL); while (ppRow != NULL) { @@ -435,7 +435,7 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) { ppRow = taosHashIterate(hash, ppRow); } - taosRUnLockLatch(pLock); + taosThreadRwlockUnlock(pLock); maxId = TMAX(maxId, pSdb->maxId[type]); return maxId + 1;