feat: arb add version in sarbgroup
This commit is contained in:
parent
fc7575279c
commit
d0df927dfa
|
@ -2236,6 +2236,7 @@ typedef struct {
|
||||||
SMArbUpdateGroupReqMember members[2];
|
SMArbUpdateGroupReqMember members[2];
|
||||||
int8_t isSync;
|
int8_t isSync;
|
||||||
SMArbUpdateGroupReqMember assignedLeader;
|
SMArbUpdateGroupReqMember assignedLeader;
|
||||||
|
int64_t version;
|
||||||
} SMArbUpdateGroupReq;
|
} SMArbUpdateGroupReq;
|
||||||
|
|
||||||
int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
|
int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
|
||||||
|
|
|
@ -279,6 +279,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
#define TSDB_SHOW_SUBQUERY_LEN 1000
|
||||||
#define TSDB_LOG_VAR_LEN 32
|
#define TSDB_LOG_VAR_LEN 32
|
||||||
|
|
||||||
|
#define TSDB_ARB_GROUP_MEMBER_NUM 2
|
||||||
#define TSDB_ARB_TOKEN_SIZE 32
|
#define TSDB_ARB_TOKEN_SIZE 32
|
||||||
|
|
||||||
#define TSDB_TRANS_STAGE_LEN 12
|
#define TSDB_TRANS_STAGE_LEN 12
|
||||||
|
|
|
@ -85,7 +85,7 @@ bool tsEnableWhiteList = false; // ip white list cfg
|
||||||
// arbitrator
|
// arbitrator
|
||||||
int32_t tsArbHeartBeatIntervalSec = 5;
|
int32_t tsArbHeartBeatIntervalSec = 5;
|
||||||
int32_t tsArbCheckSyncIntervalSec = 10;
|
int32_t tsArbCheckSyncIntervalSec = 10;
|
||||||
int32_t tsArbSetAssignedTimeoutSec = 50;
|
int32_t tsArbSetAssignedTimeoutSec = 30;
|
||||||
|
|
||||||
// dnode
|
// dnode
|
||||||
int64_t tsDndStart = 0;
|
int64_t tsDndStart = 0;
|
||||||
|
|
|
@ -6222,13 +6222,14 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1;
|
||||||
}
|
}
|
||||||
if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->version) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -6244,7 +6245,7 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1;
|
||||||
pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
||||||
if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1;
|
||||||
|
@ -6253,6 +6254,7 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr
|
||||||
if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1;
|
||||||
pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
||||||
if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->version) < 0) return -1;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
|
|
@ -273,9 +273,10 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
SArbGroupMember members[2];
|
SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM];
|
||||||
int8_t isSync;
|
int8_t isSync;
|
||||||
SArbAssignedLeader assignedLeader;
|
SArbAssignedLeader assignedLeader;
|
||||||
|
int64_t version;
|
||||||
|
|
||||||
// following fields will not be duplicated
|
// following fields will not be duplicated
|
||||||
bool mutexInited;
|
bool mutexInited;
|
||||||
|
|
|
@ -36,7 +36,7 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
|
||||||
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
|
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
|
||||||
|
|
||||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
|
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
|
||||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup* pNewGroup);
|
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
|
||||||
|
|
||||||
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
|
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
|
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
|
||||||
|
@ -97,7 +97,7 @@ void mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
|
||||||
memset(outGroup, 0, sizeof(SArbGroup));
|
memset(outGroup, 0, sizeof(SArbGroup));
|
||||||
outGroup->dbUid = pVgObj->dbUid;
|
outGroup->dbUid = pVgObj->dbUid;
|
||||||
outGroup->vgId = pVgObj->vgId;
|
outGroup->vgId = pVgObj->vgId;
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
SArbGroupMember *pMember = &outGroup->members[i];
|
SArbGroupMember *pMember = &outGroup->members[i];
|
||||||
pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
|
pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId;
|
||||||
}
|
}
|
||||||
|
@ -113,7 +113,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER)
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
SArbGroupMember *pMember = &pGroup->members[i];
|
SArbGroupMember *pMember = &pGroup->members[i];
|
||||||
SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||||
|
@ -123,6 +123,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
|
||||||
SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
|
SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
|
||||||
SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
|
@ -161,7 +162,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER)
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
SArbGroupMember *pMember = &pGroup->members[i];
|
SArbGroupMember *pMember = &pGroup->members[i];
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||||
|
@ -175,6 +176,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
|
||||||
SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
|
SArbAssignedLeader *pLeader = &pGroup->assignedLeader;
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
|
||||||
|
|
||||||
pGroup->mutexInited = false;
|
pGroup->mutexInited = false;
|
||||||
|
|
||||||
|
@ -214,12 +216,22 @@ static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
|
||||||
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
|
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
|
||||||
mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
|
mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
|
||||||
taosThreadMutexLock(&pOld->mutex);
|
taosThreadMutexLock(&pOld->mutex);
|
||||||
for (int i = 0; i < 2; i++) {
|
|
||||||
|
if (pOld->version >= pNew->version) {
|
||||||
|
mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
|
||||||
|
" new version:%" PRId64,
|
||||||
|
pOld->vgId, pOld, pNew, pOld->version, pNew->version);
|
||||||
|
taosThreadMutexUnlock(&pOld->mutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
memcpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
|
memcpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
}
|
}
|
||||||
pOld->isSync = pNew->isSync;
|
pOld->isSync = pNew->isSync;
|
||||||
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
||||||
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
|
pOld->version = pNew->version;
|
||||||
taosThreadMutexUnlock(&pOld->mutex);
|
taosThreadMutexUnlock(&pOld->mutex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -326,7 +338,7 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
||||||
|
|
||||||
taosThreadMutexLock(&pArbGroup->mutex);
|
taosThreadMutexLock(&pArbGroup->mutex);
|
||||||
|
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
SArbGroupMember *pMember = &pArbGroup->members[i];
|
SArbGroupMember *pMember = &pArbGroup->members[i];
|
||||||
int32_t dnodeId = pMember->info.dnodeId;
|
int32_t dnodeId = pMember->info.dnodeId;
|
||||||
void *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
|
void *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t));
|
||||||
|
@ -348,6 +360,12 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
||||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||||
mError("failed to get arb token for arb-hb timer");
|
mError("failed to get arb token for arb-hb timer");
|
||||||
|
pIter = taosHashIterate(pDnodeHash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
SArray *hbMembers = *(SArray **)pIter;
|
||||||
|
taosArrayDestroy(hbMembers);
|
||||||
|
pIter = taosHashIterate(pDnodeHash, pIter);
|
||||||
|
}
|
||||||
taosHashCleanup(pDnodeHash);
|
taosHashCleanup(pDnodeHash);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -383,8 +401,8 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm, char *member0Token,
|
static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm,
|
||||||
char *member1Token) {
|
char *member0Token, char *member1Token) {
|
||||||
SVArbCheckSyncReq req = {0};
|
SVArbCheckSyncReq req = {0};
|
||||||
req.arbToken = arbToken;
|
req.arbToken = arbToken;
|
||||||
req.arbTerm = arbTerm;
|
req.arbTerm = arbTerm;
|
||||||
|
@ -501,6 +519,10 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int64_t term = mndGetTerm(pMnode);
|
int64_t term = mndGetTerm(pMnode);
|
||||||
|
if (term < 0) {
|
||||||
|
mError("arb failed to get term since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
||||||
|
@ -527,11 +549,6 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
if (member0IsTimeout == false && member1IsTimeout == false) {
|
if (member0IsTimeout == false && member1IsTimeout == false) {
|
||||||
// no assigned leader and not sync
|
// no assigned leader and not sync
|
||||||
if (currentAssignedDnodeId == 0 && !arbGroupDup.isSync) {
|
if (currentAssignedDnodeId == 0 && !arbGroupDup.isSync) {
|
||||||
if (term < 0) {
|
|
||||||
mError("vgId:%d, arb failed to get term since %s", vgId, terrstr());
|
|
||||||
sdbRelease(pSdb, pArbGroup);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
(void)mndSendArbCheckSyncReq(pMnode, arbGroupDup.vgId, arbToken, term, arbGroupDup.members[0].state.token,
|
(void)mndSendArbCheckSyncReq(pMnode, arbGroupDup.vgId, arbToken, term, arbGroupDup.members[0].state.token,
|
||||||
arbGroupDup.members[1].state.token);
|
arbGroupDup.members[1].state.token);
|
||||||
}
|
}
|
||||||
|
@ -569,6 +586,7 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
SArbGroup newGroup = {0};
|
SArbGroup newGroup = {0};
|
||||||
mndArbGroupDupObj(&arbGroupDup, &newGroup);
|
mndArbGroupDupObj(&arbGroupDup, &newGroup);
|
||||||
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
|
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
|
||||||
|
newGroup.version++;
|
||||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||||
mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId,
|
mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId,
|
||||||
terrstr());
|
terrstr());
|
||||||
|
@ -595,13 +613,14 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
|
||||||
SMArbUpdateGroupReq req = {0};
|
SMArbUpdateGroupReq req = {0};
|
||||||
req.vgId = pNewGroup->vgId;
|
req.vgId = pNewGroup->vgId;
|
||||||
req.dbUid = pNewGroup->dbUid;
|
req.dbUid = pNewGroup->dbUid;
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId;
|
req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId;
|
||||||
req.members[i].token = pNewGroup->members[i].state.token;
|
req.members[i].token = pNewGroup->members[i].state.token;
|
||||||
}
|
}
|
||||||
req.isSync = pNewGroup->isSync;
|
req.isSync = pNewGroup->isSync;
|
||||||
req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId;
|
req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId;
|
||||||
req.assignedLeader.token = pNewGroup->assignedLeader.token;
|
req.assignedLeader.token = pNewGroup->assignedLeader.token;
|
||||||
|
req.version = pNewGroup->version;
|
||||||
|
|
||||||
int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req);
|
int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req);
|
||||||
if (contLen <= 0) return NULL;
|
if (contLen <= 0) return NULL;
|
||||||
|
@ -616,7 +635,7 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
|
||||||
return pHead;
|
return pHead;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup* pNewGroup) {
|
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
|
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
|
||||||
if (!pHead) {
|
if (!pHead) {
|
||||||
|
@ -637,7 +656,7 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
|
||||||
SArbGroup newGroup = {0};
|
SArbGroup newGroup = {0};
|
||||||
newGroup.vgId = req.vgId;
|
newGroup.vgId = req.vgId;
|
||||||
newGroup.dbUid = req.dbUid;
|
newGroup.dbUid = req.dbUid;
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
newGroup.members[i].info.dnodeId = req.members[i].dnodeId;
|
newGroup.members[i].info.dnodeId = req.members[i].dnodeId;
|
||||||
memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE);
|
memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE);
|
||||||
}
|
}
|
||||||
|
@ -645,6 +664,7 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
|
||||||
newGroup.isSync = req.isSync;
|
newGroup.isSync = req.isSync;
|
||||||
newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId;
|
newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId;
|
||||||
memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
|
newGroup.version = req.version;
|
||||||
|
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
|
if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
|
||||||
|
@ -713,7 +733,7 @@ bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMembe
|
||||||
taosThreadMutexLock(&pGroup->mutex);
|
taosThreadMutexLock(&pGroup->mutex);
|
||||||
|
|
||||||
int index = 0;
|
int index = 0;
|
||||||
for (; index < 2; index++) {
|
for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
|
||||||
pMember = &pGroup->members[index];
|
pMember = &pGroup->members[index];
|
||||||
if (pMember->info.dnodeId == dnodeId) {
|
if (pMember->info.dnodeId == dnodeId) {
|
||||||
break;
|
break;
|
||||||
|
@ -746,6 +766,7 @@ bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMembe
|
||||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||||
memcpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
|
memcpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
|
||||||
pNewGroup->isSync = false;
|
pNewGroup->isSync = false;
|
||||||
|
pNewGroup->version++;
|
||||||
|
|
||||||
bool resetAssigned = false;
|
bool resetAssigned = false;
|
||||||
if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
|
if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) {
|
||||||
|
@ -815,6 +836,7 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0
|
||||||
if (pGroup->isSync != newIsSync) {
|
if (pGroup->isSync != newIsSync) {
|
||||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||||
pNewGroup->isSync = newIsSync;
|
pNewGroup->isSync = newIsSync;
|
||||||
|
pNewGroup->version++;
|
||||||
|
|
||||||
mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync);
|
mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync);
|
||||||
updateIsSync = true;
|
updateIsSync = true;
|
||||||
|
@ -846,6 +868,8 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
||||||
|
int32_t ret = -1;
|
||||||
|
|
||||||
SMnode *pMnode = pRsp->info.node;
|
SMnode *pMnode = pRsp->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
@ -870,14 +894,16 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers);
|
(void)mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers);
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
ret = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSVArbHeartBeatRsp(&arbHbRsp);
|
tFreeSVArbHeartBeatRsp(&arbHbRsp);
|
||||||
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||||
|
int32_t ret = -1;
|
||||||
|
|
||||||
SMnode *pMnode = pRsp->info.node;
|
SMnode *pMnode = pRsp->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
@ -905,11 +931,14 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||||
bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
|
bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
|
||||||
if (mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync) != 0) {
|
if (mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync) != 0) {
|
||||||
mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
|
mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
|
||||||
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSVArbCheckSyncRsp(&syncRsp);
|
tFreeSVArbCheckSyncRsp(&syncRsp);
|
||||||
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
||||||
|
@ -924,17 +953,14 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
|
||||||
}
|
}
|
||||||
|
|
||||||
if (errcode != TSDB_CODE_SUCCESS) {
|
if (errcode != TSDB_CODE_SUCCESS) {
|
||||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
mInfo("skip update arb assigned for vgId:%d, since:%s", vgId, tstrerror(errcode));
|
||||||
mndArbGroupResetAssignedLeader(pNewGroup);
|
|
||||||
|
|
||||||
mInfo("vgId:%d, arb resetting assigned leader", vgId);
|
|
||||||
updateAssigned = true;
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pGroup->isSync) {
|
if (pGroup->isSync) {
|
||||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||||
pNewGroup->isSync = false;
|
pNewGroup->isSync = false;
|
||||||
|
pNewGroup->version++;
|
||||||
|
|
||||||
mInfo("vgId:%d, arb isSync is setting to false", vgId);
|
mInfo("vgId:%d, arb isSync is setting to false", vgId);
|
||||||
updateAssigned = true;
|
updateAssigned = true;
|
||||||
|
@ -947,6 +973,8 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
||||||
|
int32_t ret = -1;
|
||||||
|
|
||||||
SMnode *pMnode = pRsp->info.node;
|
SMnode *pMnode = pRsp->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
@ -983,12 +1011,15 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
||||||
if (updateAssigned) {
|
if (updateAssigned) {
|
||||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||||
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
|
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
|
||||||
|
goto _OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
|
tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
|
||||||
return terrno == TSDB_CODE_SUCCESS ? 0 : -1;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
|
@ -1019,7 +1050,7 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false);
|
||||||
|
|
||||||
for (int i = 0; i < 2; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
SArbGroupMember *pMember = &pGroup->members[i];
|
SArbGroupMember *pMember = &pGroup->members[i];
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false);
|
||||||
|
|
|
@ -313,11 +313,10 @@ SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) {
|
||||||
|
|
||||||
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
|
SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) {
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId);
|
||||||
SDnodeObj* pDnode = mndAcquireDnode(pMnode, dnodeId);
|
|
||||||
if (!pDnode) return epSet;
|
if (!pDnode) return epSet;
|
||||||
|
|
||||||
addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port);
|
epSet = mndGetDnodeEpset(pDnode);
|
||||||
|
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
return epSet;
|
return epSet;
|
||||||
|
|
|
@ -34,7 +34,7 @@ void generateArbToken(int32_t nodeId, int32_t vgId, char* buf) {
|
||||||
memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
|
memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
|
||||||
int32_t randVal = taosSafeRand() % 1000;
|
int32_t randVal = taosSafeRand() % 1000;
|
||||||
int64_t currentMs = taosGetTimestampMs();
|
int64_t currentMs = taosGetTimestampMs();
|
||||||
sprintf(buf, "d%d#g%d#%" PRId64 "#%d", nodeId, vgId, currentMs, randVal);
|
snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, vgId, currentMs, randVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
|
@ -78,6 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
||||||
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
|
||||||
if (commitIndex >= ths->assignedCommitIndex) {
|
if (commitIndex >= ths->assignedCommitIndex) {
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
raftStoreNextTerm(ths);
|
raftStoreNextTerm(ths);
|
||||||
if (terrno != TSDB_CODE_SUCCESS) {
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno));
|
sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno));
|
||||||
|
|
|
@ -661,6 +661,12 @@ int32_t syncGetAssignedLogSynced(int64_t rid) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
|
terrno = TSDB_CODE_VND_ARB_NOT_SYNCED;
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
|
bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex;
|
||||||
terrno = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
|
terrno = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED);
|
||||||
|
|
||||||
|
|
|
@ -107,7 +107,7 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
|
||||||
memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
|
memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
|
||||||
int32_t randVal = taosSafeRand() % 1000;
|
int32_t randVal = taosSafeRand() % 1000;
|
||||||
int64_t currentMs = taosGetTimestampMs();
|
int64_t currentMs = taosGetTimestampMs();
|
||||||
sprintf(buf, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal);
|
snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
// for leader
|
// for leader
|
||||||
|
|
Loading…
Reference in New Issue