diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 75d5669824..885086e37a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "mmInt.h" +#define PROCESS_THRESHOLD (2000 * 1000) + static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { int32_t code = 0; taosThreadRwlockRdlock(&pMgmt->lock); @@ -53,6 +55,14 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = mndProcessRpcMsg(pMsg); + if (pInfo->timestamp != 0) { + int64_t cost = taosGetTimestampUs() - pInfo->timestamp; + if (cost > PROCESS_THRESHOLD) { + dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum, + TMSG_INFO(pMsg->msgType), cost / (1000 * 1000)); + } + } + if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 92ab5274e4..d0a86bdde7 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -27,6 +27,8 @@ #define ARBGROUP_VER_NUMBER 1 #define ARBGROUP_RESERVE_SIZE 64 +static SHashObj *arbUpdateHash = NULL; + static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup); static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew); static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup); @@ -74,10 +76,14 @@ int32_t mndInitArbGroup(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup); + arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupArbGroup(SMnode *pMnode) {} +void mndCleanupArbGroup(SMnode *pMnode) { + taosHashCleanup(arbUpdateHash); +} SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) { SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId); @@ -221,8 +227,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64 " new version:%" PRId64, pOld->vgId, pOld, pNew, pOld->version, pNew->version); - taosThreadMutexUnlock(&pOld->mutex); - return 0; + goto _OVER; } for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { @@ -232,7 +237,11 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId; memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); pOld->version++; + +_OVER: taosThreadMutexUnlock(&pOld->mutex); + + taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t)); return 0; } @@ -645,6 +654,11 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) } static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { + if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) { + mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId); + return 0; + } + int32_t contLen = 0; void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup); if (!pHead) { @@ -653,7 +667,11 @@ static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { } SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true}; - return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (ret == 0) { + taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); + } + return ret; } static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { @@ -930,8 +948,12 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { SVArbCheckSyncRsp syncRsp = {0}; if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) { - terrno = TSDB_CODE_INVALID_MSG; mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code)); + if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) { + terrno = TSDB_CODE_SUCCESS; + return 0; + } + terrno = TSDB_CODE_INVALID_MSG; return -1; }