From ecfd874c53fa0ea4fb3850f1d982b1f49b6c3ec5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 30 Dec 2021 01:28:06 -0800 Subject: [PATCH] Improve SDB traversal efficiency --- include/dnode/mnode/sdb/sdb.h | 16 ++- source/dnode/mnode/impl/src/mndDb.c | 4 +- source/dnode/mnode/impl/src/mndVgroup.c | 124 +++++++++++++----------- source/dnode/mnode/sdb/src/sdbHash.c | 24 +++++ 4 files changed, 105 insertions(+), 63 deletions(-) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 33f9dc5a1a..3ff86bea3e 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -181,6 +181,7 @@ typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); +typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); typedef struct { ESdbType sdbType; @@ -279,7 +280,7 @@ void sdbRelease(SSdb *pSdb, void *pObj); * * @param pSdb The sdb object. * @param type The type of the table. - * @param type The initial iterator of the table. + * @param pIter The initial iterator of the table. * @param pObj The object of the row just fetched. * @return void* The next iterator of the table. */ @@ -289,11 +290,22 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj); * @brief Cancel a traversal * * @param pSdb The sdb object. - * @param pIter The iterator of the table. * @param type The initial iterator of table. */ void sdbCancelFetch(SSdb *pSdb, void *pIter); +/** + * @brief Traverse a sdb + * + * @param pSdb The sdb object. + * @param type The initial iterator of table. + * @param fp The function pointer. + * @param p1 The callback param. + * @param p2 The callback param. + * @param p3 The callback param. + */ +void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3); + /** * @brief Get the number of rows in the table * diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index d05e301024..f9199f9eed 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -549,7 +549,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO return 0; } -static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -725,7 +725,7 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { STransAction action = {0}; - SVnodeGid * pVgid = pVgroup->vnodeGid + vn; + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 1b0026cd13..41de70283f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -238,39 +238,48 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p return pDrop; } +static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SDnodeObj *pDnode = pObj; + pDnode->numOfVnodes = 0; + return true; +} + +static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SDnodeObj *pDnode = pObj; + SArray *pArray = p1; + + pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); + + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + if (online && pDnode->numOfSupportVnodes > 0) { + taosArrayPush(pArray, pDnode); + } + + bool isMnode = mndIsMnode(pMnode, pDnode->id); + + mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes, + pDnode->numOfSupportVnodes, isMnode, online); + + if (isMnode) { + pDnode->numOfVnodes++; + } + + return true; +} + static SArray *mndBuildDnodesArray(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; int32_t numOfDnodes = mndGetDnodeSize(pMnode); + SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj)); if (pArray == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - void *pIter = NULL; - while (1) { - SDnodeObj *pDnode = NULL; - pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); - if (pIter == NULL) break; - - int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); - - bool isMnode = mndIsMnode(pMnode, pDnode->id); - if (isMnode) { - pDnode->numOfVnodes++; - } - - int64_t curMs = taosGetTimestampMs(); - bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - if (online) { - taosArrayPush(pArray, pDnode); - } - - mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes, - pDnode->numOfSupportVnodes, isMnode, online); - sdbRelease(pSdb, pDnode); - } - + sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL); + sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL); return pArray; } @@ -302,7 +311,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr pVgid->role = TAOS_SYNC_STATE_FOLLOWER; } - mDebug("db:%s, vgId:%d, vindex:%d dnodeId:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId); + mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId); pDnode->numOfVnodes++; } @@ -412,6 +421,20 @@ static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SVgObj *pVgroup = pObj; + int64_t uid = *(int64_t *)p1; + int8_t *pReplica = p2; + int32_t *pNumOfVgroups = p3; + + if (pVgroup->dbUid == uid) { + *pReplica = MAX(*pReplica, pVgroup->replica); + (*pNumOfVgroups)++; + } + + return true; +} + static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); @@ -420,25 +443,10 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep return -1; } - int8_t replica = 1; - int32_t numOfVgroups = 0; - - void *pIter = NULL; - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - if (pVgroup->dbUid == pDb->uid) { - replica = MAX(replica, pVgroup->replica); - numOfVgroups++; - } - - sdbRelease(pSdb, pVgroup); - } - - *pReplica = replica; - *pNumOfVgroups = numOfVgroups; + *pReplica = 1; + *pNumOfVgroups = 0; + sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups); + mndReleaseDb(pMnode, pDb); return 0; } @@ -540,25 +548,23 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { - SSdb *pSdb = pMnode->pSdb; - int32_t numOfVnodes = 0; - void *pIter = NULL; +static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SVgObj *pVgroup = pObj; + int32_t dnodeId = *(int32_t *)p1; + int32_t *pNumOfVnodes = (int32_t *)p2; - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - for (int32_t v = 0; v < pVgroup->replica; ++v) { - if (pVgroup->vnodeGid[v].dnodeId == dnodeId) { - numOfVnodes++; - } + for (int32_t v = 0; v < pVgroup->replica; ++v) { + if (pVgroup->vnodeGid[v].dnodeId == dnodeId) { + (*pNumOfVnodes)++; } - - sdbRelease(pSdb, pVgroup); } + return true; +} + +int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { + int32_t numOfVnodes = 0; + sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL); return numOfVnodes; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 06c2563910..0388fa99f5 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -336,6 +336,30 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { taosRUnLockLatch(pLock); } +void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { + SHashObj *hash = sdbGetHash(pSdb, type); + if (hash == NULL) return; + + SRWLatch *pLock = &pSdb->locks[type]; + taosRLockLatch(pLock); + + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow->status == SDB_STATUS_READY) { + bool isContinue = (*fp)(pSdb->pMnode, pRow->pObj, p1, p2, p3); + if (!isContinue) { + taosHashCancelIterate(hash, ppRow); + break; + } + } + + ppRow = taosHashIterate(hash, ppRow); + } + + taosRUnLockLatch(pLock); +} + int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return 0;