fix: arb distinguish between isSync and acked
This commit is contained in:
parent
a8383369ba
commit
4c47626baf
|
@ -2361,12 +2361,19 @@ typedef struct {
|
||||||
} SMArbUpdateGroupMember;
|
} SMArbUpdateGroupMember;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t dnodeId;
|
||||||
int64_t dbUid;
|
char* token;
|
||||||
SMArbUpdateGroupMember members[2];
|
int8_t acked;
|
||||||
int8_t isSync;
|
} SMArbUpdateGroupAssigned;
|
||||||
SMArbUpdateGroupMember assignedLeader;
|
|
||||||
int64_t version;
|
typedef struct {
|
||||||
|
int32_t vgId;
|
||||||
|
int64_t dbUid;
|
||||||
|
SMArbUpdateGroupMember members[2];
|
||||||
|
int8_t isSync;
|
||||||
|
int8_t assignedAcked;
|
||||||
|
SMArbUpdateGroupAssigned assignedLeader;
|
||||||
|
int64_t version;
|
||||||
} SMArbUpdateGroup;
|
} SMArbUpdateGroup;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -76,6 +76,7 @@ static const SSysDbTableSchema arbGroupsSchema[] = {
|
||||||
{.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
|
{.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SSysDbTableSchema clusterSchema[] = {
|
static const SSysDbTableSchema clusterSchema[] = {
|
||||||
|
|
|
@ -6460,6 +6460,11 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat
|
||||||
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
|
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
|
||||||
|
if (tEncodeI8(&encoder, pGroup->assignedLeader.acked) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -6492,8 +6497,18 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd
|
||||||
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
||||||
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &group.version) < 0) return -1;
|
if (tDecodeI64(&decoder, &group.version) < 0) return -1;
|
||||||
|
group.assignedLeader.acked = false;
|
||||||
|
|
||||||
taosArrayPush(updateArray, &group);
|
taosArrayPush(updateArray, &group);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SMArbUpdateGroup *pGroup = taosArrayGet(updateArray, i);
|
||||||
|
if (tDecodeI8(&decoder, &pGroup->assignedLeader.acked) < 0) return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pReq->updateArray = updateArray;
|
pReq->updateArray = updateArray;
|
||||||
|
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
|
@ -255,6 +255,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
char token[TSDB_ARB_TOKEN_SIZE];
|
char token[TSDB_ARB_TOKEN_SIZE];
|
||||||
|
int8_t acked;
|
||||||
} SArbAssignedLeader;
|
} SArbAssignedLeader;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
|
||||||
#define ARBGROUP_VER_NUMBER 1
|
#define ARBGROUP_VER_NUMBER 1
|
||||||
#define ARBGROUP_RESERVE_SIZE 64
|
#define ARBGROUP_RESERVE_SIZE 63
|
||||||
|
|
||||||
static SHashObj *arbUpdateHash = NULL;
|
static SHashObj *arbUpdateHash = NULL;
|
||||||
|
|
||||||
|
@ -129,6 +129,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
|
||||||
|
SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
|
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
|
||||||
|
|
||||||
|
@ -182,6 +183,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
|
||||||
|
SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
|
||||||
|
|
||||||
pGroup->mutexInited = false;
|
pGroup->mutexInited = false;
|
||||||
|
|
||||||
|
@ -235,6 +237,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
||||||
pOld->isSync = pNew->isSync;
|
pOld->isSync = pNew->isSync;
|
||||||
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
||||||
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
|
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
|
||||||
pOld->version++;
|
pOld->version++;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -565,8 +568,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
|
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
|
||||||
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
|
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
|
||||||
|
|
||||||
// 1. has assigned && is sync => send req
|
// 1. has assigned && is sync && no response => send req
|
||||||
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) {
|
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true && pAssignedLeader->acked == false) {
|
||||||
(void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term,
|
(void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term,
|
||||||
pAssignedLeader->token);
|
pAssignedLeader->token);
|
||||||
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId);
|
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId);
|
||||||
|
@ -658,6 +661,7 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup)
|
||||||
outGroup->isSync = pGroup->isSync;
|
outGroup->isSync = pGroup->isSync;
|
||||||
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
|
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
|
||||||
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
|
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
|
||||||
|
outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
|
||||||
outGroup->version = pGroup->version;
|
outGroup->version = pGroup->version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -773,6 +777,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
||||||
newGroup.isSync = pUpdateGroup->isSync;
|
newGroup.isSync = pUpdateGroup->isSync;
|
||||||
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
|
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
|
||||||
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
|
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
|
||||||
newGroup.version = pUpdateGroup->version;
|
newGroup.version = pUpdateGroup->version;
|
||||||
|
|
||||||
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
||||||
|
@ -790,10 +795,10 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
|
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
|
||||||
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
|
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
|
||||||
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
|
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
|
||||||
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token);
|
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pOldGroup);
|
sdbRelease(pMnode->pSdb, pOldGroup);
|
||||||
}
|
}
|
||||||
|
@ -826,11 +831,13 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
|
||||||
|
|
||||||
pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
|
pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
|
||||||
strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
|
strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
|
pGroup->assignedLeader.acked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
||||||
pGroup->assignedLeader.dnodeId = 0;
|
pGroup->assignedLeader.dnodeId = 0;
|
||||||
memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
|
memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
|
||||||
|
pGroup->assignedLeader.acked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
||||||
|
@ -841,10 +848,10 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
|
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
|
||||||
pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
|
pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
|
||||||
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
|
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
|
||||||
pNew->assignedLeader.token);
|
pNew->assignedLeader.token, pNew->assignedLeader.acked);
|
||||||
|
|
||||||
mndTransAddArbGroupId(pTrans, pNew->vgId);
|
mndTransAddArbGroupId(pTrans, pNew->vgId);
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
|
@ -1110,11 +1117,12 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pGroup->isSync) {
|
if (pGroup->assignedLeader.acked == false) {
|
||||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||||
pNewGroup->isSync = false;
|
pNewGroup->isSync = false;
|
||||||
|
pNewGroup->assignedLeader.acked = true;
|
||||||
|
|
||||||
mInfo("vgId:%d, arb isSync is setting to false", vgId);
|
mInfo("vgId:%d, arb received assigned ack", vgId);
|
||||||
updateAssigned = true;
|
updateAssigned = true;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
@ -1224,12 +1232,18 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
|
STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)token, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)token, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false);
|
||||||
} else {
|
} else {
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pGroup->mutex);
|
taosThreadMutexUnlock(&pGroup->mutex);
|
||||||
|
|
Loading…
Reference in New Issue