From 4c47626baf315221d36e5c407ae149d79aad39dd Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 28 May 2024 16:29:18 +0800 Subject: [PATCH] fix: arb distinguish between isSync and acked --- include/common/tmsg.h | 19 +++++++++----- source/common/src/systable.c | 1 + source/common/src/tmsg.c | 15 +++++++++++ source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndArbGroup.c | 32 ++++++++++++++++------- 5 files changed, 53 insertions(+), 15 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index be2f443140..6aa2cd8c36 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2361,12 +2361,19 @@ typedef struct { } SMArbUpdateGroupMember; typedef struct { - int32_t vgId; - int64_t dbUid; - SMArbUpdateGroupMember members[2]; - int8_t isSync; - SMArbUpdateGroupMember assignedLeader; - int64_t version; + int32_t dnodeId; + char* token; + int8_t acked; +} SMArbUpdateGroupAssigned; + +typedef struct { + int32_t vgId; + int64_t dbUid; + SMArbUpdateGroupMember members[2]; + int8_t isSync; + int8_t assignedAcked; + SMArbUpdateGroupAssigned assignedLeader; + int64_t version; } SMArbUpdateGroup; typedef struct { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 9de682dd3a..95b18705cd 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -76,6 +76,7 @@ static const SSysDbTableSchema arbGroupsSchema[] = { {.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, }; static const SSysDbTableSchema clusterSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ef37a41fcf..ad1243f21b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6460,6 +6460,11 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat if (tEncodeI64(&encoder, pGroup->version) < 0) return -1; } + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i); + if (tEncodeI8(&encoder, pGroup->assignedLeader.acked) < 0) return -1; + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -6492,8 +6497,18 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1; if (tDecodeI64(&decoder, &group.version) < 0) return -1; + group.assignedLeader.acked = false; + taosArrayPush(updateArray, &group); } + + if (!tDecodeIsEnd(&decoder)) { + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(updateArray, i); + if (tDecodeI8(&decoder, &pGroup->assignedLeader.acked) < 0) return -1; + } + } + pReq->updateArray = updateArray; tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 0c40622d08..5c21e9b22b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -255,6 +255,7 @@ typedef struct { typedef struct { int32_t dnodeId; char token[TSDB_ARB_TOKEN_SIZE]; + int8_t acked; } SArbAssignedLeader; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index b00da9ba3f..6a6b3d2daa 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -25,7 +25,7 @@ #include "mndVgroup.h" #define ARBGROUP_VER_NUMBER 1 -#define ARBGROUP_RESERVE_SIZE 64 +#define ARBGROUP_RESERVE_SIZE 63 static SHashObj *arbUpdateHash = NULL; @@ -129,6 +129,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) { SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER) SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER) + SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER) SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER) @@ -182,6 +183,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER) SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER) pGroup->mutexInited = false; @@ -235,6 +237,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p pOld->isSync = pNew->isSync; pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId; memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + pOld->assignedLeader.acked = pNew->assignedLeader.acked; pOld->version++; _OVER: @@ -565,8 +568,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader; int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; - // 1. has assigned && is sync => send req - if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) { + // 1. has assigned && is sync && no response => send req + if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true && pAssignedLeader->acked == false) { (void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term, pAssignedLeader->token); mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId); @@ -658,6 +661,7 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) outGroup->isSync = pGroup->isSync; outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId; outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer + outGroup->assignedLeader.acked = pGroup->assignedLeader.acked; outGroup->version = pGroup->version; } @@ -773,6 +777,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) { newGroup.isSync = pUpdateGroup->isSync; newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId; memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked; newGroup.version = pUpdateGroup->version; SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); @@ -790,10 +795,10 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) { goto _OVER; } - mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", + mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]", pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token, newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync, - newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token); + newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked); sdbRelease(pMnode->pSdb, pOldGroup); } @@ -826,11 +831,13 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) { pGroup->assignedLeader.dnodeId = pMember->info.dnodeId; strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE); + pGroup->assignedLeader.acked = false; } static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) { pGroup->assignedLeader.dnodeId = 0; memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE); + pGroup->assignedLeader.acked = false; } static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { @@ -841,10 +848,10 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { goto _OVER; } - mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", + mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]", pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token, pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId, - pNew->assignedLeader.token); + pNew->assignedLeader.token, pNew->assignedLeader.acked); mndTransAddArbGroupId(pTrans, pNew->vgId); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1110,11 +1117,12 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char goto _OVER; } - if (pGroup->isSync) { + if (pGroup->assignedLeader.acked == false) { mndArbGroupDupObj(pGroup, pNewGroup); pNewGroup->isSync = false; + pNewGroup->assignedLeader.acked = true; - mInfo("vgId:%d, arb isSync is setting to false", vgId); + mInfo("vgId:%d, arb received assigned ack", vgId); updateAssigned = true; goto _OVER; } @@ -1224,12 +1232,18 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)token, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false); } else { pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetNULL(pColInfo, numOfRows); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetNULL(pColInfo, numOfRows); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, numOfRows); } taosThreadMutexUnlock(&pGroup->mutex);