Merge pull request #25512 from taosdata/fix/TD-29713
fix: arb reduce duplicate updates
This commit is contained in:
commit
63a735a546
|
@ -16,6 +16,8 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "mmInt.h"
|
||||
|
||||
#define PROCESS_THRESHOLD (2000 * 1000)
|
||||
|
||||
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
|
||||
int32_t code = 0;
|
||||
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||
|
@ -53,6 +55,14 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t code = mndProcessRpcMsg(pMsg);
|
||||
|
||||
if (pInfo->timestamp != 0) {
|
||||
int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
|
||||
if (cost > PROCESS_THRESHOLD) {
|
||||
dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
|
||||
TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
|
||||
}
|
||||
}
|
||||
|
||||
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
if (code != 0 && terrno != 0) code = terrno;
|
||||
mmSendRsp(pMsg, code);
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
#define ARBGROUP_VER_NUMBER 1
|
||||
#define ARBGROUP_RESERVE_SIZE 64
|
||||
|
||||
static SHashObj *arbUpdateHash = NULL;
|
||||
|
||||
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
|
||||
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
|
||||
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
|
||||
|
@ -74,10 +76,14 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
|
|||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
|
||||
|
||||
arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
void mndCleanupArbGroup(SMnode *pMnode) {}
|
||||
void mndCleanupArbGroup(SMnode *pMnode) {
|
||||
taosHashCleanup(arbUpdateHash);
|
||||
}
|
||||
|
||||
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
|
||||
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
||||
|
@ -221,8 +227,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
|||
mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
|
||||
" new version:%" PRId64,
|
||||
pOld->vgId, pOld, pNew, pOld->version, pNew->version);
|
||||
taosThreadMutexUnlock(&pOld->mutex);
|
||||
return 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
|
@ -232,7 +237,11 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
|||
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
||||
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
pOld->version++;
|
||||
|
||||
_OVER:
|
||||
taosThreadMutexUnlock(&pOld->mutex);
|
||||
|
||||
taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -645,6 +654,11 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
|
|||
}
|
||||
|
||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
|
||||
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
|
||||
if (!pHead) {
|
||||
|
@ -653,7 +667,11 @@ static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
|||
}
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
|
||||
|
||||
return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
if (ret == 0) {
|
||||
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
|
||||
|
@ -930,8 +948,12 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
|
||||
SVArbCheckSyncRsp syncRsp = {0};
|
||||
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code));
|
||||
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
}
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue