Merge pull request #25950 from taosdata/fix/TD-30291-3.0

fix: arb distinguish between isSync and acked
This commit is contained in:
Hongze Cheng 2024-05-30 19:50:51 +08:00 committed by GitHub
commit 0988064f9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 86 additions and 32 deletions

View File

@ -2361,12 +2361,19 @@ typedef struct {
} SMArbUpdateGroupMember; } SMArbUpdateGroupMember;
typedef struct { typedef struct {
int32_t vgId; int32_t dnodeId;
int64_t dbUid; char* token;
SMArbUpdateGroupMember members[2]; int8_t acked;
int8_t isSync; } SMArbUpdateGroupAssigned;
SMArbUpdateGroupMember assignedLeader;
int64_t version; typedef struct {
int32_t vgId;
int64_t dbUid;
SMArbUpdateGroupMember members[2];
int8_t isSync;
int8_t assignedAcked;
SMArbUpdateGroupAssigned assignedLeader;
int64_t version;
} SMArbUpdateGroup; } SMArbUpdateGroup;
typedef struct { typedef struct {

View File

@ -101,6 +101,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
*/ */
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
int64_t mndGetRoleTimeMs(SMnode *pMnode);
/** /**
* @brief Process the rpc, sync request. * @brief Process the rpc, sync request.
* *

View File

@ -76,6 +76,7 @@ static const SSysDbTableSchema arbGroupsSchema[] = {
{.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
}; };
static const SSysDbTableSchema clusterSchema[] = { static const SSysDbTableSchema clusterSchema[] = {

View File

@ -6460,6 +6460,11 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1; if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
} }
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
if (tEncodeI8(&encoder, pGroup->assignedLeader.acked) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -6492,8 +6497,18 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1; if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
if (tDecodeI64(&decoder, &group.version) < 0) return -1; if (tDecodeI64(&decoder, &group.version) < 0) return -1;
group.assignedLeader.acked = false;
taosArrayPush(updateArray, &group); taosArrayPush(updateArray, &group);
} }
if (!tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(updateArray, i);
if (tDecodeI8(&decoder, &pGroup->assignedLeader.acked) < 0) return -1;
}
}
pReq->updateArray = updateArray; pReq->updateArray = updateArray;
tEndDecode(&decoder); tEndDecode(&decoder);

View File

@ -255,6 +255,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
char token[TSDB_ARB_TOKEN_SIZE]; char token[TSDB_ARB_TOKEN_SIZE];
int8_t acked;
} SArbAssignedLeader; } SArbAssignedLeader;
typedef struct { typedef struct {

View File

@ -25,7 +25,7 @@
#include "mndVgroup.h" #include "mndVgroup.h"
#define ARBGROUP_VER_NUMBER 1 #define ARBGROUP_VER_NUMBER 1
#define ARBGROUP_RESERVE_SIZE 64 #define ARBGROUP_RESERVE_SIZE 63
static SHashObj *arbUpdateHash = NULL; static SHashObj *arbUpdateHash = NULL;
@ -129,6 +129,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER) SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER) SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
@ -182,6 +183,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER) SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER) SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
pGroup->mutexInited = false; pGroup->mutexInited = false;
@ -235,6 +237,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
pOld->isSync = pNew->isSync; pOld->isSync = pNew->isSync;
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId; pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
pOld->version++; pOld->version++;
_OVER: _OVER:
@ -540,6 +543,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return -1; return -1;
} }
int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
int64_t nowMs = taosGetTimestampMs();
if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
nowMs);
return 0;
}
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup)); SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
while (1) { while (1) {
@ -551,15 +562,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
taosThreadMutexUnlock(&pArbGroup->mutex); taosThreadMutexUnlock(&pArbGroup->mutex);
int32_t vgId = arbGroupDup.vgId; int32_t vgId = arbGroupDup.vgId;
int64_t nowMs = taosGetTimestampMs();
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader; SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && is sync => send req // 1. has assigned && is sync && no response => send req
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) { if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true && pAssignedLeader->acked == false) {
(void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term, (void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term,
pAssignedLeader->token); pAssignedLeader->token);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId); mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId);
@ -651,6 +661,7 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup)
outGroup->isSync = pGroup->isSync; outGroup->isSync = pGroup->isSync;
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId; outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
outGroup->version = pGroup->version; outGroup->version = pGroup->version;
} }
@ -766,6 +777,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
newGroup.isSync = pUpdateGroup->isSync; newGroup.isSync = pUpdateGroup->isSync;
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId; newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
newGroup.version = pUpdateGroup->version; newGroup.version = pUpdateGroup->version;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
@ -783,10 +795,10 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token, pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync, newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token); newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
sdbRelease(pMnode->pSdb, pOldGroup); sdbRelease(pMnode->pSdb, pOldGroup);
} }
@ -819,11 +831,13 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
pGroup->assignedLeader.dnodeId = pMember->info.dnodeId; pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE); strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
pGroup->assignedLeader.acked = false;
} }
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) { static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
pGroup->assignedLeader.dnodeId = 0; pGroup->assignedLeader.dnodeId = 0;
memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE); memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
pGroup->assignedLeader.acked = false;
} }
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
@ -834,10 +848,10 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
goto _OVER; goto _OVER;
} }
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token, pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId, pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
pNew->assignedLeader.token); pNew->assignedLeader.token, pNew->assignedLeader.acked);
mndTransAddArbGroupId(pTrans, pNew->vgId); mndTransAddArbGroupId(pTrans, pNew->vgId);
if (mndTransCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1103,11 +1117,12 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
goto _OVER; goto _OVER;
} }
if (pGroup->isSync) { if (pGroup->assignedLeader.acked == false) {
mndArbGroupDupObj(pGroup, pNewGroup); mndArbGroupDupObj(pGroup, pNewGroup);
pNewGroup->isSync = false; pNewGroup->isSync = false;
pNewGroup->assignedLeader.acked = true;
mInfo("vgId:%d, arb isSync is setting to false", vgId); mInfo("vgId:%d, arb received assigned ack", vgId);
updateAssigned = true; updateAssigned = true;
goto _OVER; goto _OVER;
} }
@ -1217,12 +1232,18 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)token, false); colDataSetVal(pColInfo, numOfRows, (const char *)token, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false);
} else { } else {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
} }
taosThreadMutexUnlock(&pGroup->mutex); taosThreadMutexUnlock(&pGroup->mutex);

View File

@ -334,6 +334,8 @@ static int32_t minCronTime() {
min = TMIN(min, tsStreamCheckpointInterval); min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 6); // checkpointRemain min = TMIN(min, 6); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval); min = TMIN(min, tsStreamNodeCheckInterval);
min = TMIN(min, tsArbHeartBeatIntervalSec);
min = TMIN(min, tsArbCheckSyncIntervalSec);
int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
min = TMIN(min, telemInt); min = TMIN(min, telemInt);
@ -390,6 +392,18 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
if (sec % tsUptimeInterval == 0) { if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode); mndIncreaseUpTime(pMnode);
} }
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
if (mndPullupArbHeartbeat(pMnode) != 0) {
mError("failed to pullup arb heartbeat, since:%s", terrstr());
}
}
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
if (mndPullupArbCheckSync(pMnode) != 0) {
mError("failed to pullup arb check sync, since:%s", terrstr());
}
}
} }
void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) { void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
if (sec % (tsStatusInterval * 5) == 0) { if (sec % (tsStatusInterval * 5) == 0) {
@ -421,18 +435,6 @@ static void *mndThreadFp(void *param) {
continue; continue;
} }
mndDoTimerPullupTask(pMnode, sec); mndDoTimerPullupTask(pMnode, sec);
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
if (mndPullupArbHeartbeat(pMnode) != 0) {
mError("failed to pullup arb heartbeat, since:%s", terrstr());
}
}
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
if (mndPullupArbCheckSync(pMnode) != 0) {
mError("failed to pullup arb check sync, since:%s", terrstr());
}
}
} }
return NULL; return NULL;
@ -1076,6 +1078,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
return 0; return 0;
} }
int64_t mndGetRoleTimeMs(SMnode *pMnode) {
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
return state.roleTimeMs;
}
void mndSetRestored(SMnode *pMnode, bool restored) { void mndSetRestored(SMnode *pMnode, bool restored) {
if (restored) { if (restored) {
taosThreadRwlockWrlock(&pMnode->lock); taosThreadRwlockWrlock(&pMnode->lock);

View File

@ -222,7 +222,7 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult)) tdLog.info(len(tdSql.queryResult))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(254, 255)) tdSql.checkEqual(True, len(tdSql.queryResult) in range(255, 256))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult)) tdSql.checkEqual(54, len(tdSql.queryResult))