fix: arb reduce duplicate updates
This commit is contained in:
parent
10d82a3acf
commit
21f508fe58
|
@ -16,6 +16,8 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mmInt.h"
|
#include "mmInt.h"
|
||||||
|
|
||||||
|
#define PROCESS_THRESHOLD (2000 * 1000)
|
||||||
|
|
||||||
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
|
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
taosThreadRwlockRdlock(&pMgmt->lock);
|
taosThreadRwlockRdlock(&pMgmt->lock);
|
||||||
|
@ -53,6 +55,14 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t code = mndProcessRpcMsg(pMsg);
|
int32_t code = mndProcessRpcMsg(pMsg);
|
||||||
|
|
||||||
|
if (pInfo->timestamp != 0) {
|
||||||
|
int64_t cost = taosGetTimestampUs() - pInfo->timestamp;
|
||||||
|
if (cost > PROCESS_THRESHOLD) {
|
||||||
|
dGWarn("worker:%d,message has been processed for too long, type:%s, cost: %" PRId64 "s", pInfo->threadNum,
|
||||||
|
TMSG_INFO(pMsg->msgType), cost / (1000 * 1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
mmSendRsp(pMsg, code);
|
mmSendRsp(pMsg, code);
|
||||||
|
|
|
@ -27,6 +27,8 @@
|
||||||
#define ARBGROUP_VER_NUMBER 1
|
#define ARBGROUP_VER_NUMBER 1
|
||||||
#define ARBGROUP_RESERVE_SIZE 64
|
#define ARBGROUP_RESERVE_SIZE 64
|
||||||
|
|
||||||
|
static SHashObj *arbUpdateHash = NULL;
|
||||||
|
|
||||||
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
|
static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup);
|
||||||
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
|
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew);
|
||||||
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
|
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup);
|
||||||
|
@ -74,10 +76,14 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndRetrieveArbGroups);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
|
||||||
|
|
||||||
|
arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndCleanupArbGroup(SMnode *pMnode) {}
|
void mndCleanupArbGroup(SMnode *pMnode) {
|
||||||
|
taosHashCleanup(arbUpdateHash);
|
||||||
|
}
|
||||||
|
|
||||||
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
|
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
|
||||||
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
||||||
|
@ -221,8 +227,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
||||||
mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
|
mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
|
||||||
" new version:%" PRId64,
|
" new version:%" PRId64,
|
||||||
pOld->vgId, pOld, pNew, pOld->version, pNew->version);
|
pOld->vgId, pOld, pNew, pOld->version, pNew->version);
|
||||||
taosThreadMutexUnlock(&pOld->mutex);
|
goto _OVER;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
|
@ -232,7 +237,11 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
||||||
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
||||||
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
pOld->version++;
|
pOld->version++;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
taosThreadMutexUnlock(&pOld->mutex);
|
taosThreadMutexUnlock(&pOld->mutex);
|
||||||
|
|
||||||
|
taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -645,6 +654,11 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||||
|
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
|
||||||
|
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
|
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
|
||||||
if (!pHead) {
|
if (!pHead) {
|
||||||
|
@ -653,7 +667,11 @@ static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||||
}
|
}
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
|
||||||
|
|
||||||
return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
if (ret == 0) {
|
||||||
|
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
|
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
|
||||||
|
@ -930,8 +948,12 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||||
|
|
||||||
SVArbCheckSyncRsp syncRsp = {0};
|
SVArbCheckSyncRsp syncRsp = {0};
|
||||||
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
|
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code));
|
mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code));
|
||||||
|
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue