diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 33f9dc5a1a..3ff86bea3e 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -181,6 +181,7 @@ typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); +typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); typedef struct { ESdbType sdbType; @@ -279,7 +280,7 @@ void sdbRelease(SSdb *pSdb, void *pObj); * * @param pSdb The sdb object. * @param type The type of the table. - * @param type The initial iterator of the table. + * @param pIter The initial iterator of the table. * @param pObj The object of the row just fetched. * @return void* The next iterator of the table. */ @@ -289,11 +290,22 @@ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj); * @brief Cancel a traversal * * @param pSdb The sdb object. - * @param pIter The iterator of the table. * @param type The initial iterator of table. */ void sdbCancelFetch(SSdb *pSdb, void *pIter); +/** + * @brief Traverse a sdb + * + * @param pSdb The sdb object. + * @param type The initial iterator of table. + * @param fp The function pointer. + * @param p1 The callback param. + * @param p2 The callback param. + * @param p3 The callback param. + */ +void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3); + /** * @brief Get the number of rows in the table * diff --git a/include/util/tlog.h b/include/util/tlog.h index 5c91398cdc..a367243a46 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -37,7 +37,6 @@ extern int32_t mqttDebugFlag; extern int32_t monDebugFlag; extern int32_t uDebugFlag; extern int32_t rpcDebugFlag; -extern int32_t odbcDebugFlag; extern int32_t qDebugFlag; extern int32_t wDebugFlag; extern int32_t sDebugFlag; diff --git a/include/util/tqueue.h b/include/util/tqueue.h index bcb9aea856..a57bdb5ce8 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -22,59 +22,57 @@ extern "C" { /* -This set of API for queue is designed specially for vnode/mnode. The main purpose is to -consume all the items instead of one item from a queue by one single read. Also, it can -combine multiple queues into a queue set, a consumer thread can consume a queue set via +This set of API for queue is designed specially for vnode/mnode. The main purpose is to +consume all the items instead of one item from a queue by one single read. Also, it can +combine multiple queues into a queue set, a consumer thread can consume a queue set via a single API instead of looping every queue by itself. Notes: -1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe +1: taosOpenQueue/taosCloseQueue, taosOpenQset/taosCloseQset is NOT multi-thread safe 2: after taosCloseQueue/taosCloseQset is called, read/write operation APIs are not safe. 3: read/write operation APIs are multi-thread safe To remove the limitation and make this set of queue APIs multi-thread safe, REF(tref.c) -shall be used to set up the protection. +shall be used to set up the protection. */ -typedef void *taos_queue; -typedef void *taos_qset; -typedef void *taos_qall; +typedef struct STaosQueue STaosQueue; +typedef struct STaosQset STaosQset; +typedef struct STaosQall STaosQall; typedef void (*FProcessItem)(void *ahandle, void *pItem); -typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems); +typedef void (*FProcessItems)(void *ahandle, STaosQall *qall, int32_t numOfItems); -taos_queue taosOpenQueue(); -void taosCloseQueue(taos_queue); -void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems); -void *taosAllocateQitem(int size); -void taosFreeQitem(void *pItem); -int taosWriteQitem(taos_queue, void *pItem); -int taosReadQitem(taos_queue, void **pItem); -bool taosQueueEmpty(taos_queue); +STaosQueue *taosOpenQueue(); +void taosCloseQueue(STaosQueue *queue); +void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp); +void *taosAllocateQitem(int32_t size); +void taosFreeQitem(void *pItem); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem); +int32_t taosReadQitem(STaosQueue *queue, void **ppItem); +bool taosQueueEmpty(STaosQueue *queue); -taos_qall taosAllocateQall(); -void taosFreeQall(taos_qall); -int taosReadAllQitems(taos_queue, taos_qall); -int taosGetQitem(taos_qall, void **pItem); -void taosResetQitems(taos_qall); +STaosQall *taosAllocateQall(); +void taosFreeQall(STaosQall *qall); +int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); +int32_t taosGetQitem(STaosQall *qall, void **ppItem); +void taosResetQitems(STaosQall *qall); -taos_qset taosOpenQset(); -void taosCloseQset(); -void taosQsetThreadResume(taos_qset param); -int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); -void taosRemoveFromQset(taos_qset, taos_queue); -int taosGetQueueNumber(taos_qset); +STaosQset *taosOpenQset(); +void taosCloseQset(STaosQset *qset); +void taosQsetThreadResume(STaosQset *qset); +int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); +void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); +int32_t taosGetQueueNumber(STaosQset *qset); -int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *); -int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *); +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp); +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp); -int taosGetQueueItemsNumber(taos_queue param); -int taosGetQsetItemsNumber(taos_qset param); +int32_t taosGetQueueItemsNumber(STaosQueue *queue); +int32_t taosGetQsetItemsNumber(STaosQset *qset); #ifdef __cplusplus } #endif #endif /*_TD_UTIL_QUEUE_H*/ - - diff --git a/include/util/tworker.h b/include/util/tworker.h index 2e5852cbba..27f03bd2b6 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -35,7 +35,7 @@ typedef struct SWorkerPool { int32_t max; // max number of workers int32_t min; // min number of workers int32_t num; // current number of workers - taos_qset qset; + STaosQset *qset; const char *name; SWorker *workers; pthread_mutex_t mutex; @@ -44,8 +44,8 @@ typedef struct SWorkerPool { typedef struct SMWorker { int32_t id; // worker id pthread_t thread; // thread - taos_qall qall; - taos_qset qset; // queue set + STaosQall *qall; + STaosQset *qset; // queue set SMWorkerPool *pool; } SMWorker; @@ -57,15 +57,15 @@ typedef struct SMWorkerPool { pthread_mutex_t mutex; } SMWorkerPool; -int32_t tWorkerInit(SWorkerPool *pool); -void tWorkerCleanup(SWorkerPool *pool); -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); -void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue); +int32_t tWorkerInit(SWorkerPool *pool); +void tWorkerCleanup(SWorkerPool *pool); +STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp); +void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue); -int32_t tMWorkerInit(SMWorkerPool *pool); -void tMWorkerCleanup(SMWorkerPool *pool); -taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); -void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue); +int32_t tMWorkerInit(SMWorkerPool *pool); +void tMWorkerCleanup(SMWorkerPool *pool); +STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp); +void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 8161b8d125..cedab6266e 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -140,7 +140,7 @@ void dmnInitOption(SDnodeOpt *pOption) { pOption->sver = 30000000; //3.0.0.0 pOption->numOfCores = tsNumOfCores; pOption->numOfSupportVnodes = tsNumOfSupportVnodes; - pOption->numOfCommitThreads = 1; + pOption->numOfCommitThreads = tsNumOfCommitThreads; pOption->statusInterval = tsStatusInterval; pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore; pOption->ratioOfQueryCores = tsRatioOfQueryCores; diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 954e21aefa..07c8ce5d02 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -64,7 +64,7 @@ typedef struct { int32_t maxNum; void *queueFp; SDnode *pDnode; - taos_queue queue; + STaosQueue *queue; union { SWorkerPool pool; SMWorkerPool mpool; @@ -92,7 +92,7 @@ typedef struct { SDnodeEps *dnodeEps; pthread_t *threadId; SRWLatch latch; - taos_queue pMgmtQ; + STaosQueue *pMgmtQ; SWorkerPool mgmtPool; } SDnodeMgmt; diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index 66b619318d..c12d449517 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -19,7 +19,7 @@ #include "dndTransport.h" #include "dndWorker.h" -static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs); +static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SBnode *dndAcquireBnode(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; @@ -286,7 +286,7 @@ static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) { taosFreeQitem(pMsg); } -static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) { +static void dndSendBnodeErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { for (int32_t i = 0; i < numOfMsgs; ++i) { SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); @@ -294,7 +294,7 @@ static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t cod } } -static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessBnodeQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { SBnode *pBnode = dndAcquireBnode(pDnode); if (pBnode == NULL) { dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 3eab3e5aec..8835e0ba65 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -27,20 +27,20 @@ typedef struct { } SWrapperCfg; typedef struct { - int32_t vgId; - int32_t refCount; - int32_t vgVersion; - int8_t dropped; - int8_t accessState; - uint64_t dbUid; - char *db; - char *path; - SVnode *pImpl; - taos_queue pWriteQ; - taos_queue pSyncQ; - taos_queue pApplyQ; - taos_queue pQueryQ; - taos_queue pFetchQ; + int32_t vgId; + int32_t refCount; + int32_t vgVersion; + int8_t dropped; + int8_t accessState; + uint64_t dbUid; + char *db; + char *path; + SVnode *pImpl; + STaosQueue *pWriteQ; + STaosQueue *pSyncQ; + STaosQueue *pApplyQ; + STaosQueue *pQueryQ; + STaosQueue* pFetchQ; } SVnodeObj; typedef struct { @@ -72,9 +72,9 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); -static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); -static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs); +static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); +static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); +static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -768,7 +768,7 @@ static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp); } -static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -804,7 +804,7 @@ static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t taosArrayDestroy(pArray); } -static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -815,7 +815,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t } } -static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) { +static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -826,7 +826,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t } } -static int32_t dndWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { +static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) { int32_t code = 0; if (pQueue == NULL) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e66ffd4426..e26c0c5ae7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -117,7 +117,7 @@ typedef struct { int64_t rebootTime; int64_t lastAccessTime; int32_t accessTimes; - int16_t numOfVnodes; + int32_t numOfVnodes; int32_t numOfSupportVnodes; int32_t numOfCores; EDndReason offlineReason; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index d05e301024..f9199f9eed 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -549,7 +549,7 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO return 0; } -static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -725,7 +725,7 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { STransAction action = {0}; - SVnodeGid * pVgid = pVgroup->vnodeGid + vn; + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode == NULL) return -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ca20ba61a1..1c7f8544e9 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -196,6 +196,8 @@ static SDnodeObj *mndAcquireDnodeByEp(SMnode *pMnode, char *pEpStr) { sdbCancelFetch(pSdb, pIter); return pDnode; } + + sdbRelease(pSdb, pDnode); } return NULL; @@ -419,10 +421,9 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SCreateDnodeMsg *pCreate = pMsg->rpcMsg.pCont; - + pCreate->port = htonl(pCreate->port); mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port); - pCreate->port = htonl(pCreate->port); if (pCreate->fqdn[0] == 0 || pCreate->port <= 0 || pCreate->port > UINT16_MAX) { terrno = TSDB_CODE_MND_INVALID_DNODE_EP; mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index b76d72d79c..ad3d5e1cf6 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -219,6 +219,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { } pEpSet->numOfEps++; + sdbRelease(pSdb, pObj); } } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 9263fca695..cf434dda39 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -79,17 +79,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); - rawDataLen += sdbGetRawTotalSize(pTmp); + rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); } for (int32_t i = 0; i < undoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); - rawDataLen += sdbGetRawTotalSize(pTmp); + rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); } for (int32_t i = 0; i < commitLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); - rawDataLen += sdbGetRawTotalSize(pTmp); + rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); } for (int32_t i = 0; i < redoActionNum; ++i) { @@ -437,7 +437,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr()); return -1; } sdbSetRawStatus(pRaw, SDB_STATUS_READY); @@ -835,7 +835,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr()); } sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 1b0026cd13..41de70283f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -238,39 +238,48 @@ SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *p return pDrop; } +static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SDnodeObj *pDnode = pObj; + pDnode->numOfVnodes = 0; + return true; +} + +static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SDnodeObj *pDnode = pObj; + SArray *pArray = p1; + + pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); + + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + if (online && pDnode->numOfSupportVnodes > 0) { + taosArrayPush(pArray, pDnode); + } + + bool isMnode = mndIsMnode(pMnode, pDnode->id); + + mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes, + pDnode->numOfSupportVnodes, isMnode, online); + + if (isMnode) { + pDnode->numOfVnodes++; + } + + return true; +} + static SArray *mndBuildDnodesArray(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; int32_t numOfDnodes = mndGetDnodeSize(pMnode); + SArray *pArray = taosArrayInit(numOfDnodes, sizeof(SDnodeObj)); if (pArray == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - void *pIter = NULL; - while (1) { - SDnodeObj *pDnode = NULL; - pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); - if (pIter == NULL) break; - - int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); - - bool isMnode = mndIsMnode(pMnode, pDnode->id); - if (isMnode) { - pDnode->numOfVnodes++; - } - - int64_t curMs = taosGetTimestampMs(); - bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - if (online) { - taosArrayPush(pArray, pDnode); - } - - mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes, - pDnode->numOfSupportVnodes, isMnode, online); - sdbRelease(pSdb, pDnode); - } - + sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL); + sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL); return pArray; } @@ -302,7 +311,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr pVgid->role = TAOS_SYNC_STATE_FOLLOWER; } - mDebug("db:%s, vgId:%d, vindex:%d dnodeId:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId); + mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId); pDnode->numOfVnodes++; } @@ -412,6 +421,20 @@ static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; } +static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SVgObj *pVgroup = pObj; + int64_t uid = *(int64_t *)p1; + int8_t *pReplica = p2; + int32_t *pNumOfVgroups = p3; + + if (pVgroup->dbUid == uid) { + *pReplica = MAX(*pReplica, pVgroup->replica); + (*pNumOfVgroups)++; + } + + return true; +} + static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) { SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); @@ -420,25 +443,10 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep return -1; } - int8_t replica = 1; - int32_t numOfVgroups = 0; - - void *pIter = NULL; - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - if (pVgroup->dbUid == pDb->uid) { - replica = MAX(replica, pVgroup->replica); - numOfVgroups++; - } - - sdbRelease(pSdb, pVgroup); - } - - *pReplica = replica; - *pNumOfVgroups = numOfVgroups; + *pReplica = 1; + *pNumOfVgroups = 0; + sdbTraverse(pSdb, SDB_VGROUP, mndGetVgroupMaxReplicaFp, &pDb->uid, pReplica, pNumOfVgroups); + mndReleaseDb(pMnode, pDb); return 0; } @@ -540,25 +548,23 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { - SSdb *pSdb = pMnode->pSdb; - int32_t numOfVnodes = 0; - void *pIter = NULL; +static bool mndGetVnodesNumFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { + SVgObj *pVgroup = pObj; + int32_t dnodeId = *(int32_t *)p1; + int32_t *pNumOfVnodes = (int32_t *)p2; - while (1) { - SVgObj *pVgroup = NULL; - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - if (pIter == NULL) break; - - for (int32_t v = 0; v < pVgroup->replica; ++v) { - if (pVgroup->vnodeGid[v].dnodeId == dnodeId) { - numOfVnodes++; - } + for (int32_t v = 0; v < pVgroup->replica; ++v) { + if (pVgroup->vnodeGid[v].dnodeId == dnodeId) { + (*pNumOfVnodes)++; } - - sdbRelease(pSdb, pVgroup); } + return true; +} + +int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { + int32_t numOfVnodes = 0; + sdbTraverse(pMnode->pSdb, SDB_VGROUP, mndGetVnodesNumFp, &dnodeId, &numOfVnodes, NULL); return numOfVnodes; } diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 06c2563910..0388fa99f5 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -336,6 +336,30 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter) { taosRUnLockLatch(pLock); } +void sdbTraverse(SSdb *pSdb, ESdbType type, sdbTraverseFp fp, void *p1, void *p2, void *p3) { + SHashObj *hash = sdbGetHash(pSdb, type); + if (hash == NULL) return; + + SRWLatch *pLock = &pSdb->locks[type]; + taosRLockLatch(pLock); + + SSdbRow **ppRow = taosHashIterate(hash, NULL); + while (ppRow != NULL) { + SSdbRow *pRow = *ppRow; + if (pRow->status == SDB_STATUS_READY) { + bool isContinue = (*fp)(pSdb->pMnode, pRow->pObj, p1, p2, p3); + if (!isContinue) { + taosHashCancelIterate(hash, ppRow); + break; + } + } + + ppRow = taosHashIterate(hash, ppRow); + } + + taosRUnLockLatch(pLock); +} + int32_t sdbGetSize(SSdb *pSdb, ESdbType type) { SHashObj *hash = sdbGetHash(pSdb, type); if (hash == NULL) return 0; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 04bc0c8dc8..75f5e9cdbc 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -14,26 +14,29 @@ */ #include "os.h" -#include "ulog.h" + #include "taoserror.h" #include "tqueue.h" +#include "ulog.h" + +typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { - struct STaosQnode *next; - char item[]; + STaosQnode *next; + char item[]; } STaosQnode; typedef struct STaosQueue { - int32_t itemSize; - int32_t numOfItems; - struct STaosQnode *head; - struct STaosQnode *tail; - struct STaosQueue *next; // for queue set - struct STaosQset *qset; // for queue set - void *ahandle; // for queue set - FProcessItem itemFp; - FProcessItems itemsFp; - pthread_mutex_t mutex; + int32_t itemSize; + int32_t numOfItems; + STaosQnode *head; + STaosQnode *tail; + STaosQueue *next; // for queue set + STaosQset *qset; // for queue set + void *ahandle; // for queue set + FProcessItem itemFp; + FProcessItems itemsFp; + pthread_mutex_t mutex; } STaosQueue; typedef struct STaosQset { @@ -52,8 +55,8 @@ typedef struct STaosQall { int32_t numOfItems; } STaosQall; -taos_queue taosOpenQueue() { - STaosQueue *queue = (STaosQueue *)calloc(sizeof(STaosQueue), 1); +STaosQueue *taosOpenQueue() { + STaosQueue *queue = calloc(sizeof(STaosQueue), 1); if (queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -65,16 +68,14 @@ taos_queue taosOpenQueue() { return queue; } -void taosSetQueueFp(taos_queue param, FProcessItem itemFp, FProcessItems itemsFp) { - if (param == NULL) return; - STaosQueue *queue = (STaosQueue *)param; +void taosSetQueueFp(STaosQueue *queue, FProcessItem itemFp, FProcessItems itemsFp) { + if (queue == NULL) return; queue->itemFp = itemFp; queue->itemsFp = itemsFp; } -void taosCloseQueue(taos_queue param) { - if (param == NULL) return; - STaosQueue *queue = (STaosQueue *)param; +void taosCloseQueue(STaosQueue *queue) { + if (queue == NULL) return; STaosQnode *pTemp; STaosQset *qset; @@ -98,9 +99,8 @@ void taosCloseQueue(taos_queue param) { uTrace("queue:%p is closed", queue); } -bool taosQueueEmpty(taos_queue param) { - if (param == NULL) return true; - STaosQueue *queue = (STaosQueue *)param; +bool taosQueueEmpty(STaosQueue *queue) { + if (queue == NULL) return true; bool empty = false; pthread_mutex_lock(&queue->mutex); @@ -112,7 +112,7 @@ bool taosQueueEmpty(taos_queue param) { return empty; } -void *taosAllocateQitem(int size) { +void *taosAllocateQitem(int32_t size) { STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); if (pNode == NULL) return NULL; @@ -129,9 +129,8 @@ void taosFreeQitem(void *param) { free(temp); } -int taosWriteQitem(taos_queue param, void *item) { - STaosQueue *queue = (STaosQueue *)param; - STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { + STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode)); pNode->next = NULL; pthread_mutex_lock(&queue->mutex); @@ -146,7 +145,7 @@ int taosWriteQitem(taos_queue param, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - uTrace("item:%p is put into queue:%p, items:%d", item, queue, queue->numOfItems); + uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -155,22 +154,21 @@ int taosWriteQitem(taos_queue param, void *item) { return 0; } -int taosReadQitem(taos_queue param, void **pitem) { - STaosQueue *queue = (STaosQueue *)param; +int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { STaosQnode *pNode = NULL; - int code = 0; + int32_t code = 0; pthread_mutex_lock(&queue->mutex); if (queue->head) { pNode = queue->head; - *pitem = pNode->item; + *ppItem = pNode->item; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; - uDebug("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + uDebug("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -178,18 +176,13 @@ int taosReadQitem(taos_queue param, void **pitem) { return code; } -void *taosAllocateQall() { - void *p = calloc(sizeof(STaosQall), 1); - return p; -} +STaosQall *taosAllocateQall() { return calloc(sizeof(STaosQall), 1); } -void taosFreeQall(void *param) { free(param); } +void taosFreeQall(STaosQall *qall) { free(qall); } -int taosReadAllQitems(taos_queue param, taos_qall p2) { - STaosQueue *queue = (STaosQueue *)param; - STaosQall *qall = (STaosQall *)p2; - int code = 0; - bool empty; +int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) { + int32_t code = 0; + bool empty; pthread_mutex_lock(&queue->mutex); @@ -219,29 +212,25 @@ int taosReadAllQitems(taos_queue param, taos_qall p2) { return code; } -int taosGetQitem(taos_qall param, void **pitem) { - STaosQall *qall = (STaosQall *)param; +int32_t taosGetQitem(STaosQall *qall, void **ppItem) { STaosQnode *pNode; - int num = 0; + int32_t num = 0; pNode = qall->current; if (pNode) qall->current = pNode->next; if (pNode) { - *pitem = pNode->item; + *ppItem = pNode->item; num = 1; - uTrace("item:%p is fetched", *pitem); + uTrace("item:%p is fetched", *ppItem); } return num; } -void taosResetQitems(taos_qall param) { - STaosQall *qall = (STaosQall *)param; - qall->current = qall->start; -} +void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } -taos_qset taosOpenQset() { +STaosQset *taosOpenQset() { STaosQset *qset = (STaosQset *)calloc(sizeof(STaosQset), 1); if (qset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -255,9 +244,8 @@ taos_qset taosOpenQset() { return qset; } -void taosCloseQset(taos_qset param) { - if (param == NULL) return; - STaosQset *qset = (STaosQset *)param; +void taosCloseQset(STaosQset *qset) { + if (qset == NULL) return; // remove all the queues from qset pthread_mutex_lock(&qset->mutex); @@ -279,16 +267,12 @@ void taosCloseQset(taos_qset param) { // tsem_post 'qset->sem', so that reader threads waiting for it // resumes execution and return, should only be used to signal the // thread to exit. -void taosQsetThreadResume(taos_qset param) { - STaosQset *qset = (STaosQset *)param; +void taosQsetThreadResume(STaosQset *qset) { uDebug("qset:%p, it will exit", qset); tsem_post(&qset->sem); } -int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { - STaosQueue *queue = (STaosQueue *)p2; - STaosQset *qset = (STaosQset *)p1; - +int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) { if (queue->qset) return -1; pthread_mutex_lock(&qset->mutex); @@ -309,10 +293,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { return 0; } -void taosRemoveFromQset(taos_qset p1, taos_queue p2) { - STaosQueue *queue = (STaosQueue *)p2; - STaosQset *qset = (STaosQset *)p1; - +void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { STaosQueue *tqueue = NULL; pthread_mutex_lock(&qset->mutex); @@ -353,18 +334,17 @@ void taosRemoveFromQset(taos_qset p1, taos_queue p2) { uTrace("queue:%p is removed from qset:%p", queue, qset); } -int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } +int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProcessItem *itemFp) { - STaosQset *qset = (STaosQset *)param; +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FProcessItem *itemFp) { STaosQnode *pNode = NULL; - int code = 0; + int32_t code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); - for (int i = 0; i < qset->numOfQueues; ++i) { + for (int32_t i = 0; i < qset->numOfQueues; ++i) { if (qset->current == NULL) qset->current = qset->head; STaosQueue *queue = qset->current; if (queue) qset->current = queue->next; @@ -375,7 +355,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces if (queue->head) { pNode = queue->head; - *pitem = pNode->item; + *ppItem = pNode->item; if (ahandle) *ahandle = queue->ahandle; if (itemFp) *itemFp = queue->itemFp; queue->head = pNode->next; @@ -383,7 +363,7 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; - uTrace("item:%p is read out from queue:%p, items:%d", *pitem, queue, queue->numOfItems); + uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -395,18 +375,15 @@ int taosReadQitemFromQset(taos_qset param, void **pitem, void **ahandle, FProces return code; } -int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FProcessItems *itemsFp) { - STaosQset *qset = (STaosQset *)param; +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FProcessItems *itemsFp) { STaosQueue *queue; - STaosQall *qall = (STaosQall *)p2; - int code = 0; + int32_t code = 0; tsem_wait(&qset->sem); pthread_mutex_lock(&qset->mutex); - for(int i=0; inumOfQueues; ++i) { - if (qset->current == NULL) - qset->current = qset->head; + for (int32_t i = 0; i < qset->numOfQueues; ++i) { + if (qset->current == NULL) qset->current = qset->head; queue = qset->current; if (queue) qset->current = queue->next; if (queue == NULL) break; @@ -427,34 +404,32 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **ahandle, FPr queue->tail = NULL; queue->numOfItems = 0; atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); - for (int j=1; jnumOfItems; ++j) tsem_wait(&qset->sem); - } + for (int32_t j = 1; j < qall->numOfItems; ++j) tsem_wait(&qset->sem); + } pthread_mutex_unlock(&queue->mutex); - if (code != 0) break; + if (code != 0) break; } pthread_mutex_unlock(&qset->mutex); return code; } -int taosGetQueueItemsNumber(taos_queue param) { - STaosQueue *queue = (STaosQueue *)param; +int32_t taosGetQueueItemsNumber(STaosQueue *queue) { if (!queue) return 0; - int num; + int32_t num; pthread_mutex_lock(&queue->mutex); num = queue->numOfItems; pthread_mutex_unlock(&queue->mutex); return num; } -int taosGetQsetItemsNumber(taos_qset param) { - STaosQset *qset = (STaosQset *)param; +int32_t taosGetQsetItemsNumber(STaosQset *qset) { if (!qset) return 0; - int num = 0; + int32_t num = 0; pthread_mutex_lock(&qset->mutex); num = qset->numOfItems; pthread_mutex_unlock(&qset->mutex); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index fb7b71b845..ed74041712 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -85,9 +85,9 @@ static void *tWorkerThreadFp(SWorker *worker) { return NULL; } -taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { +STaosQueue *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) { pthread_mutex_lock(&pool->mutex); - taos_queue queue = taosOpenQueue(); + STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); return NULL; @@ -121,7 +121,7 @@ taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp) return queue; } -void tWorkerFreeQueue(SWorkerPool *pool, void *queue) { +void tWorkerFreeQueue(SWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); } @@ -195,11 +195,11 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { return NULL; } -taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { +STaosQueue *tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp) { pthread_mutex_lock(&pool->mutex); SMWorker *worker = pool->workers + pool->nextId; - taos_queue *queue = taosOpenQueue(); + STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); return NULL; @@ -250,7 +250,7 @@ taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems f return queue; } -void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue) { +void tMWorkerFreeQueue(SMWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); }