Improve SDB traversal efficiency
This commit is contained in:
parent
ad44f5e011
commit
ecfd874c53
|
@ -181,6 +181,7 @@ typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj);
|
|||
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
|
||||
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
|
||||
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
|
||||
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
|
||||
|
||||
typedef struct {
|
||||
ESdbType sdbType;
|
||||
|
@ -279,7 +280,7 @@ void sdbRelease(SSdb *pSdb, void *pObj);
|
|||
*
|
||||
* @param pSdb The sdb object.
|
||||
* @param type The type of the table.
|
||||
* @param type The initial iterator of the table.
|
||||
* @param pIter The initial iterator of the table.
|
||||
* @param pObj The object of the row just fetched.
|
||||
* @return void* The next iterator of the table.
|
||||
*/
|
||||
|
@ -289,11 +290,22 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj);
|
|||
* @brief Cancel a traversal
|
||||
*
|
||||
* @param pSdb The sdb object.
|
||||
* @param pIter The iterator of the table.
|
||||
* @param type The initial iterator of table.
|
||||
*/
|
||||
void sdbCancelFetch(SSdb *pSdb, void *pIter);
|
||||
|
||||
/**
|
||||
* @brief Traverse a sdb
|
||||
*
|
||||
* @param pSdb The sdb object.
|
||||
* @param type The initial iterator of table.
|
||||
* @param fp The function pointer.
|
||||
* @param p1 The callback param.
|
||||
* @param p2 The callback param.
|
||||
* @param p3 The callback param.
|
||||
*/
|
||||
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3);
|
||||
|
||||
/**
|
||||
* @brief Get the number of rows in the table
|
||||
*
|
||||
|
|
|
@ -549,7 +549,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
|
@ -725,7 +725,7 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
|||
static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||
STransAction action = {0};
|
||||
SVnodeGid * pVgid = pVgroup->vnodeGid + vn;
|
||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||
|
||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||
if (pDnode == NULL) return -1;
|
||||
|
|
|
@ -238,39 +238,48 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p
|
|||
return pDrop;
|
||||
}
|
||||
|
||||
static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
||||
SDnodeObj *pDnode = pObj;
|
||||
pDnode->numOfVnodes = 0;
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
||||
SDnodeObj *pDnode = pObj;
|
||||
SArray *pArray = p1;
|
||||
|
||||
pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
|
||||
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
if (online && pDnode->numOfSupportVnodes > 0) {
|
||||
taosArrayPush(pArray, pDnode);
|
||||
}
|
||||
|
||||
bool isMnode = mndIsMnode(pMnode, pDnode->id);
|
||||
|
||||
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes,
|
||||
pDnode->numOfSupportVnodes, isMnode, online);
|
||||
|
||||
if (isMnode) {
|
||||
pDnode->numOfVnodes++;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static SArray *mndBuildDnodesArray(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfDnodes = mndGetDnodeSize(pMnode);
|
||||
|
||||
SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj));
|
||||
if (pArray == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
|
||||
|
||||
bool isMnode = mndIsMnode(pMnode, pDnode->id);
|
||||
if (isMnode) {
|
||||
pDnode->numOfVnodes++;
|
||||
}
|
||||
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
if (online) {
|
||||
taosArrayPush(pArray, pDnode);
|
||||
}
|
||||
|
||||
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes,
|
||||
pDnode->numOfSupportVnodes, isMnode, online);
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
|
||||
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL);
|
||||
return pArray;
|
||||
}
|
||||
|
||||
|
@ -302,7 +311,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
|
|||
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
|
||||
}
|
||||
|
||||
mDebug("db:%s, vgId:%d, vindex:%d dnodeId:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
|
||||
mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
|
||||
pDnode->numOfVnodes++;
|
||||
}
|
||||
|
||||
|
@ -412,6 +421,20 @@ static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
|
|||
|
||||
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
|
||||
|
||||
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
||||
SVgObj *pVgroup = pObj;
|
||||
int64_t uid = *(int64_t *)p1;
|
||||
int8_t *pReplica = p2;
|
||||
int32_t *pNumOfVgroups = p3;
|
||||
|
||||
if (pVgroup->dbUid == uid) {
|
||||
*pReplica = MAX(*pReplica, pVgroup->replica);
|
||||
(*pNumOfVgroups)++;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
|
@ -420,25 +443,10 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep
|
|||
return -1;
|
||||
}
|
||||
|
||||
int8_t replica = 1;
|
||||
int32_t numOfVgroups = 0;
|
||||
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pVgroup->dbUid == pDb->uid) {
|
||||
replica = MAX(replica, pVgroup->replica);
|
||||
numOfVgroups++;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
*pReplica = replica;
|
||||
*pNumOfVgroups = numOfVgroups;
|
||||
*pReplica = 1;
|
||||
*pNumOfVgroups = 0;
|
||||
sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -540,25 +548,23 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfVnodes = 0;
|
||||
void *pIter = NULL;
|
||||
static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
||||
SVgObj *pVgroup = pObj;
|
||||
int32_t dnodeId = *(int32_t *)p1;
|
||||
int32_t *pNumOfVnodes = (int32_t *)p2;
|
||||
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
|
||||
numOfVnodes++;
|
||||
}
|
||||
for (int32_t v = 0; v < pVgroup->replica; ++v) {
|
||||
if (pVgroup->vnodeGid[v].dnodeId == dnodeId) {
|
||||
(*pNumOfVnodes)++;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) {
|
||||
int32_t numOfVnodes = 0;
|
||||
sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL);
|
||||
return numOfVnodes;
|
||||
}
|
||||
|
||||
|
|
|
@ -336,6 +336,30 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) {
|
|||
taosRUnLockLatch(pLock);
|
||||
}
|
||||
|
||||
void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) {
|
||||
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||
if (hash == NULL) return;
|
||||
|
||||
SRWLatch *pLock = &pSdb->locks[type];
|
||||
taosRLockLatch(pLock);
|
||||
|
||||
SSdbRow **ppRow = taosHashIterate(hash, NULL);
|
||||
while (ppRow != NULL) {
|
||||
SSdbRow *pRow = *ppRow;
|
||||
if (pRow->status == SDB_STATUS_READY) {
|
||||
bool isContinue = (*fp)(pSdb->pMnode, pRow->pObj, p1, p2, p3);
|
||||
if (!isContinue) {
|
||||
taosHashCancelIterate(hash, ppRow);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ppRow = taosHashIterate(hash, ppRow);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(pLock);
|
||||
}
|
||||
|
||||
int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
|
||||
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||
if (hash == NULL) return 0;
|
||||
|
|
Loading…
Reference in New Issue