diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 33f9dc5a1a..3ff86bea3e 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -181,6 +181,7 @@ typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); +typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); typedef struct { ESdbType sdbType; @@ -279,7 +280,7 @@ void sdbRelease(SSdb *pSdb, void *pObj); * * @param pSdb The sdb object. * @param type The type of the table. - * @param type The initial iterator of the table. + * @param pIter The initial iterator of the table. * @param pObj The object of the row just fetched. * @return void* The next iterator of the table. */ @@ -289,11 +290,22 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj); * @brief Cancel a traversal * * @param pSdb The sdb object. - * @param pIter The iterator of the table. * @param type The initial iterator of table. */ void sdbCancelFetch(SSdb *pSdb, void *pIter); +/** + * @brief Traverse a sdb + * + * @param pSdb The sdb object. + * @param type The initial iterator of table. + * @param fp The function pointer. + * @param p1 The callback param. + * @param p2 The callback param. + * @param p3 The callback param. + */ +void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3); + /** * @brief Get the number of rows in the table * diff --git a/include/util/tlog.h b/include/util/tlog.h index 5c91398cdc..a367243a46 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -37,7 +37,6 @@ extern int32_t mqttDebugFlag; extern int32_t monDebugFlag; extern int32_t uDebugFlag; extern int32_t rpcDebugFlag; -extern int32_t odbcDebugFlag; extern int32_t qDebugFlag; extern int32_t wDebugFlag; extern int32_t sDebugFlag; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e66ffd4426..e26c0c5ae7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -117,7 +117,7 @@ typedef struct { int64_t rebootTime; int64_t lastAccessTime; int32_t accessTimes; - int16_t numOfVnodes; + int32_t numOfVnodes; int32_t numOfSupportVnodes; int32_t numOfCores; EDndReason offlineReason; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index d05e301024..f9199f9eed 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -549,7 +549,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO return 0; } -static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -725,7 +725,7 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { STransAction action = {0}; - SVnodeGid * pVgid = pVgroup->vnodeGid + vn; + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ca20ba61a1..1c7f8544e9 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -196,6 +196,8 @@ static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) { sdbCancelFetch(pSdb, pIter); return pDnode; } + + sdbRelease(pSdb, pDnode); } return NULL; @@ -419,10 +421,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont; - + pCreate->port = htonl(pCreate->port); mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port); - pCreate->port = htonl(pCreate->port); if (pCreate->fqdn[0] == 0 || pCreate->port <= 0 || pCreate->port > UINT16_MAX) { terrno = TSDB_CODE_MND_INVALID_DNODE_EP; mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index b76d72d79c..ad3d5e1cf6 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -219,6 +219,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { } pEpSet->numOfEps++; + sdbRelease(pSdb, pObj); } } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 9263fca695..cf434dda39 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -79,17 +79,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); - rawDataLen += sdbGetRawTotalSize(pTmp); + rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); } for (int32_t i = 0; i < undoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); - rawDataLen += sdbGetRawTotalSize(pTmp); + rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); } for (int32_t i = 0; i < commitLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); - rawDataLen += sdbGetRawTotalSize(pTmp); + rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); } for (int32_t i = 0; i < redoActionNum; ++i) { @@ -437,7 +437,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr()); return -1; } sdbSetRawStatus(pRaw, SDB_STATUS_READY); @@ -835,7 +835,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr()); } sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 1b0026cd13..41de70283f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -238,39 +238,48 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p return pDrop; } +static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SDnodeObj *pDnode = pObj; + pDnode->numOfVnodes = 0; + return true; +} + +static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SDnodeObj *pDnode = pObj; + SArray *pArray = p1; + + pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); + + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + if (online && pDnode->numOfSupportVnodes > 0) { + taosArrayPush(pArray, pDnode); + } + + bool isMnode = mndIsMnode(pMnode, pDnode->id); + + mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes, + pDnode->numOfSupportVnodes, isMnode, online); + + if (isMnode) { + pDnode->numOfVnodes++; + } + + return true; +} + static SArray *mndBuildDnodesArray(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; int32_t numOfDnodes = mndGetDnodeSize(pMnode); + SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj)); if (pArray == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - void *pIter = NULL; - while (1) { - SDnodeObj *pDnode = NULL; - pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); - if (pIter == NULL) break; - - int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); - - bool isMnode = mndIsMnode(pMnode, pDnode->id); - if (isMnode) { - pDnode->numOfVnodes++; - } - - int64_t curMs = taosGetTimestampMs(); - bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - if (online) { - taosArrayPush(pArray, pDnode); - } - - mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes, - pDnode->numOfSupportVnodes, isMnode, online); - sdbRelease(pSdb, pDnode); - } - + sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL); + sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL); return pArray; } @@ -302,7 +311,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr pVgid->role = TAOS_SYNC_STATE_FOLLOWER; } - mDebug("db:%s, vgId:%d, vindex:%d dnodeId:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId); + mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId); pDnode->numOfVnodes++; } @@ -412,6 +421,20 @@ static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SVgObj *pVgroup = pObj; + int64_t uid = *(int64_t *)p1; + int8_t *pReplica = p2; + int32_t *pNumOfVgroups = p3; + + if (pVgroup->dbUid == uid) { + *pReplica = MAX(*pReplica, pVgroup->replica); + (*pNumOfVgroups)++; + } + + return true; +} + static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); @@ -420,25 +443,10 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep return -1; } - int8_t replica = 1; - int32_t numOfVgroups = 0; - - void *pIter = NULL; - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - if (pVgroup->dbUid == pDb->uid) { - replica = MAX(replica, pVgroup->replica); - numOfVgroups++; - } - - sdbRelease(pSdb, pVgroup); - } - - *pReplica = replica; - *pNumOfVgroups = numOfVgroups; + *pReplica = 1; + *pNumOfVgroups = 0; + sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups); + mndReleaseDb(pMnode, pDb); return 0; } @@ -540,25 +548,23 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { - SSdb *pSdb = pMnode->pSdb; - int32_t numOfVnodes = 0; - void *pIter = NULL; +static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SVgObj *pVgroup = pObj; + int32_t dnodeId = *(int32_t *)p1; + int32_t *pNumOfVnodes = (int32_t *)p2; - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - for (int32_t v = 0; v < pVgroup->replica; ++v) { - if (pVgroup->vnodeGid[v].dnodeId == dnodeId) { - numOfVnodes++; - } + for (int32_t v = 0; v < pVgroup->replica; ++v) { + if (pVgroup->vnodeGid[v].dnodeId == dnodeId) { + (*pNumOfVnodes)++; } - - sdbRelease(pSdb, pVgroup); } + return true; +} + +int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { + int32_t numOfVnodes = 0; + sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL); return numOfVnodes; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 06c2563910..0388fa99f5 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -336,6 +336,30 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { taosRUnLockLatch(pLock); } +void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { + SHashObj *hash = sdbGetHash(pSdb, type); + if (hash == NULL) return; + + SRWLatch *pLock = &pSdb->locks[type]; + taosRLockLatch(pLock); + + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow->status == SDB_STATUS_READY) { + bool isContinue = (*fp)(pSdb->pMnode, pRow->pObj, p1, p2, p3); + if (!isContinue) { + taosHashCancelIterate(hash, ppRow); + break; + } + } + + ppRow = taosHashIterate(hash, ppRow); + } + + taosRUnLockLatch(pLock); +} + int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return 0;