Merge pull request #29310 from taosdata/fix/TS-5805-check-arb
fix/TS-5805-check-arb
This commit is contained in:
commit
54c8a2be0a
|
@ -2612,6 +2612,8 @@ typedef struct {
|
|||
int8_t assignedAcked;
|
||||
SMArbUpdateGroupAssigned assignedLeader;
|
||||
int64_t version;
|
||||
int32_t code;
|
||||
int64_t updateTimeMs;
|
||||
} SMArbUpdateGroup;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -294,7 +294,7 @@ int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm);
|
|||
|
||||
SSyncState syncGetState(int64_t rid);
|
||||
int32_t syncGetArbToken(int64_t rid, char* outToken);
|
||||
int32_t syncGetAssignedLogSynced(int64_t rid);
|
||||
int32_t syncCheckSynced(int64_t rid);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
const char* syncStr(ESyncState state);
|
||||
|
||||
|
|
|
@ -8302,6 +8302,12 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat
|
|||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pGroup->assignedLeader.acked));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pGroup->code));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pGroup->updateTimeMs));
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -8361,6 +8367,14 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd
|
|||
}
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pGroup = taosArrayGet(updateArray, i);
|
||||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pGroup->code));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pGroup->updateTimeMs));
|
||||
}
|
||||
}
|
||||
|
||||
pReq->updateArray = updateArray;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
|
|
@ -73,7 +73,8 @@ static const SSysDbTableSchema arbGroupsSchema[] = {
|
|||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||
{.name = "v1_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||
{.name = "v2_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||
{.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||
{.name = "is_sync", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL, .sysInfo = true},
|
||||
{.name = "check_sync_code", .bytes = 100, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||
{.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||
|
|
|
@ -38,10 +38,10 @@ int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
|||
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||
|
||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||
SArbGroup *pNewGroup);
|
||||
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||
SArbGroup *pNewGroup);
|
||||
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
||||
bool newIsSync, SArbGroup *pNewGroup);
|
||||
bool newIsSync, SArbGroup *pNewGroup, int32_t code);
|
||||
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
||||
SArbGroup *pNewGroup);
|
||||
|
||||
|
|
|
@ -316,6 +316,8 @@ typedef struct {
|
|||
int64_t dbUid;
|
||||
SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM];
|
||||
int8_t isSync;
|
||||
int32_t code;
|
||||
int64_t updateTimeMs;
|
||||
SArbAssignedLeader assignedLeader;
|
||||
int64_t version;
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#include "mndVgroup.h"
|
||||
|
||||
#define ARBGROUP_VER_NUMBER 1
|
||||
#define ARBGROUP_RESERVE_SIZE 63
|
||||
#define ARBGROUP_RESERVE_SIZE 51
|
||||
|
||||
static SHashObj *arbUpdateHash = NULL;
|
||||
|
||||
|
@ -34,9 +34,8 @@ static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew);
|
|||
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index);
|
||||
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
|
||||
|
||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
|
||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
|
||||
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray);
|
||||
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup);
|
||||
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray);
|
||||
|
||||
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
|
||||
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
|
||||
|
@ -138,6 +137,8 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
|
||||
|
||||
|
@ -194,6 +195,8 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
|
||||
|
||||
pGroup->mutexInited = false;
|
||||
|
||||
|
@ -249,6 +252,8 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
|||
tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
|
||||
pOld->version++;
|
||||
pOld->code = pNew->code;
|
||||
pOld->updateTimeMs = pNew->updateTimeMs;
|
||||
|
||||
mInfo(
|
||||
"arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
|
||||
|
@ -634,6 +639,7 @@ void mndArbCheckSync(SArbGroup *pArbGroup, int64_t nowMs, ECheckSyncOp *pOp, SAr
|
|||
mInfo("arb skip to set assigned leader to vgId:%d dnodeId:%d, arb group is not sync", vgId,
|
||||
pMember->info.dnodeId);
|
||||
}
|
||||
//*pOp = CHECK_SYNC_CHECK_SYNC;
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -721,7 +727,7 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
TAOS_CHECK_EXIT(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
|
||||
TAOS_CHECK_EXIT(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
|
||||
|
||||
_exit:
|
||||
if (code != 0) {
|
||||
|
@ -761,9 +767,11 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup)
|
|||
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
|
||||
outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
|
||||
outGroup->version = pGroup->version;
|
||||
outGroup->code = pGroup->code;
|
||||
outGroup->updateTimeMs = pGroup->updateTimeMs;
|
||||
}
|
||||
|
||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||
static int32_t mndUpdateArbGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
|
||||
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
|
||||
return 0;
|
||||
|
@ -796,7 +804,7 @@ _OVER:
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) {
|
||||
static int32_t mndBatchUpdateArbGroup(SMnode *pMnode, SArray *newGroupArray) {
|
||||
int32_t ret = -1;
|
||||
|
||||
size_t sz = taosArrayGetSize(newGroupArray);
|
||||
|
@ -857,7 +865,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to update arbgroup in create trans, since %s", terrstr());
|
||||
goto _OVER;
|
||||
|
@ -879,6 +887,16 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
|
||||
newGroup.version = pUpdateGroup->version;
|
||||
newGroup.code = pUpdateGroup->code;
|
||||
newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
|
||||
|
||||
mInfo(
|
||||
"trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
|
||||
"%" PRId64,
|
||||
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
|
||||
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
|
||||
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
|
||||
pUpdateGroup->code, pUpdateGroup->updateTimeMs);
|
||||
|
||||
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
||||
if (!pOldGroup) {
|
||||
|
@ -944,41 +962,8 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
|||
pGroup->assignedLeader.acked = false;
|
||||
}
|
||||
|
||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to update arbgroup in create trans, vgId:%d, since %s", pNew->vgId, terrstr());
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
|
||||
pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
|
||||
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
|
||||
pNew->assignedLeader.token, pNew->assignedLeader.acked);
|
||||
|
||||
mndTransAddArbGroupId(pTrans, pNew->vgId);
|
||||
if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if ((code = mndSetCreateArbGroupCommitLogs(pTrans, pNew)) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
||||
|
||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||
SArbGroup *pNewGroup) {
|
||||
bool mndCheckArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||
SArbGroup *pNewGroup) {
|
||||
bool updateToken = false;
|
||||
SArbGroupMember *pMember = NULL;
|
||||
|
||||
|
@ -1033,7 +1018,7 @@ _OVER:
|
|||
return updateToken;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
|
||||
static int32_t mndUpdateArbGroupsByHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
|
||||
int64_t nowMs = taosGetTimestampMs();
|
||||
size_t size = taosArrayGetSize(memberArray);
|
||||
SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
|
||||
|
@ -1048,7 +1033,7 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
|
|||
continue;
|
||||
}
|
||||
|
||||
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
|
||||
bool updateToken = mndCheckArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
|
||||
if (updateToken) {
|
||||
if (taosArrayPush(pUpdateArray, &newGroup) == NULL) {
|
||||
mError("failed to push newGroup to updateArray, but continue at this hearbear");
|
||||
|
@ -1058,14 +1043,14 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
|
|||
sdbRelease(pMnode->pSdb, pGroup);
|
||||
}
|
||||
|
||||
TAOS_CHECK_RETURN(mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray));
|
||||
TAOS_CHECK_RETURN(mndBatchUpdateArbGroup(pMnode, pUpdateArray));
|
||||
|
||||
taosArrayDestroy(pUpdateArray);
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
||||
bool newIsSync, SArbGroup *pNewGroup) {
|
||||
bool newIsSync, SArbGroup *pNewGroup, int32_t code) {
|
||||
bool updateIsSync = false;
|
||||
|
||||
(void)taosThreadMutexLock(&pGroup->mutex);
|
||||
|
@ -1088,8 +1073,10 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0
|
|||
if (pGroup->isSync != newIsSync) {
|
||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||
pNewGroup->isSync = newIsSync;
|
||||
pNewGroup->code = code;
|
||||
pNewGroup->updateTimeMs = taosGetTimestampMs();
|
||||
|
||||
mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync);
|
||||
mInfo("vgId:%d, arb isSync updating, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync, pNewGroup->updateTimeMs);
|
||||
updateIsSync = true;
|
||||
}
|
||||
|
||||
|
@ -1098,7 +1085,8 @@ _OVER:
|
|||
return updateIsSync;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync) {
|
||||
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync,
|
||||
int32_t rsp_code) {
|
||||
int32_t code = 0;
|
||||
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
||||
if (pGroup == NULL) {
|
||||
|
@ -1109,9 +1097,10 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
|
|||
}
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
bool updateIsSync = mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup);
|
||||
bool updateIsSync =
|
||||
mndUpdateArbGroupByCheckSync(pGroup, vgId, member0Token, member1Token, newIsSync, &newGroup, rsp_code);
|
||||
if (updateIsSync) {
|
||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||
if (mndUpdateArbGroup(pMnode, &newGroup) != 0) {
|
||||
mInfo("failed to pullup update arb sync, vgId:%d, since %s", vgId, terrstr());
|
||||
}
|
||||
}
|
||||
|
@ -1150,7 +1139,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndUpdateArbGroupsByHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers), NULL, _OVER);
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
|
@ -1185,6 +1174,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, vnode-arb-check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
|
||||
if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
|
||||
mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
|
||||
syncRsp.arbToken);
|
||||
|
@ -1193,7 +1183,8 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
}
|
||||
|
||||
bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
|
||||
if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync)) != 0) {
|
||||
if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync,
|
||||
syncRsp.errCode)) != 0) {
|
||||
mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -1278,7 +1269,7 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
|||
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
|
||||
pRsp->code, &newGroup);
|
||||
if (updateAssigned) {
|
||||
if ((code = mndPullupArbUpdateGroup(pMnode, &newGroup)) != 0) {
|
||||
if ((code = mndUpdateArbGroup(pMnode, &newGroup)) != 0) {
|
||||
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -1291,6 +1282,35 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static char *formatTimestamp(char *buf, int64_t val, int precision) {
|
||||
time_t tt;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tt = (time_t)(val / 1000000);
|
||||
}
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
tt = (time_t)(val / 1000000000);
|
||||
} else {
|
||||
tt = (time_t)(val / 1000);
|
||||
}
|
||||
|
||||
struct tm tm;
|
||||
if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
|
||||
mError("failed to get local time");
|
||||
return NULL;
|
||||
}
|
||||
size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
sprintf(buf + pos, ".%06d", (int)(val % 1000000));
|
||||
} else if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
|
||||
} else {
|
||||
sprintf(buf + pos, ".%03d", (int)(val % 1000));
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
@ -1332,9 +1352,22 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
&lino, _OVER);
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, arb group sync:%d, code:%s, update time:%" PRId64, pGroup->vgId, pGroup->isSync,
|
||||
tstrerror(pGroup->code), pGroup->updateTimeMs);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false), pGroup, &lino, _OVER);
|
||||
|
||||
char strCheckSyncCode[100] = {0};
|
||||
char bufUpdateTime[40] = {0};
|
||||
(void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
|
||||
tsnprintf(strCheckSyncCode, 100, "%s(%s)", tstrerror(pGroup->code), bufUpdateTime);
|
||||
|
||||
char checkSyncCode[100 + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(checkSyncCode, strCheckSyncCode, 100 + VARSTR_HEADER_SIZE);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)checkSyncCode, false), pGroup, &lino, _OVER);
|
||||
|
||||
if (pGroup->assignedLeader.dnodeId != 0) {
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false),
|
||||
|
|
|
@ -127,7 +127,7 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
|
|||
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||
bool updateToken = mndCheckArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||
|
||||
ASSERT_EQ(updateToken, false);
|
||||
ASSERT_NE(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
||||
|
@ -142,7 +142,7 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
|
|||
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||
bool updateToken = mndCheckArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||
|
||||
ASSERT_EQ(updateToken, false);
|
||||
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
||||
|
@ -157,7 +157,7 @@ TEST_F(ArbgroupTest, 02_process_heart_beat_rsp) {
|
|||
int32_t nowMs = group.members[0].state.lastHbMs + 10;
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
bool updateToken = mndUpdateArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||
bool updateToken = mndCheckArbGroupByHeartBeat(&group, &rspMember, nowMs, dnodeId, &newGroup);
|
||||
|
||||
ASSERT_EQ(updateToken, true);
|
||||
ASSERT_EQ(group.members[0].state.responsedHbSeq, rspMember.hbSeq);
|
||||
|
@ -201,7 +201,7 @@ TEST_F(ArbgroupTest, 03_process_check_sync_rsp) {
|
|||
bool newIsSync = false;
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
bool updateIsSync = mndUpdateArbGroupByCheckSync(&group, vgId, member0Token, member1Token, newIsSync, &newGroup);
|
||||
bool updateIsSync = mndUpdateArbGroupByCheckSync(&group, vgId, member0Token, member1Token, newIsSync, &newGroup, 0);
|
||||
|
||||
ASSERT_EQ(updateIsSync, false);
|
||||
}
|
||||
|
@ -214,7 +214,7 @@ TEST_F(ArbgroupTest, 03_process_check_sync_rsp) {
|
|||
bool newIsSync = true;
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
bool updateIsSync = mndUpdateArbGroupByCheckSync(&group, vgId, member0Token, member1Token, newIsSync, &newGroup);
|
||||
bool updateIsSync = mndUpdateArbGroupByCheckSync(&group, vgId, member0Token, member1Token, newIsSync, &newGroup, 0);
|
||||
|
||||
ASSERT_EQ(updateIsSync, true);
|
||||
ASSERT_EQ(newGroup.isSync, true);
|
||||
|
|
|
@ -53,8 +53,8 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
|
|||
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||
SRpcMsg *pOriginRpc);
|
||||
|
||||
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||
static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||
static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
|
||||
static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
||||
extern int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||
|
@ -489,7 +489,7 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
int32_t ret = vnodePreCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token);
|
||||
int32_t ret = vnodeCheckToken(pVnode, syncReq.member0Token, syncReq.member1Token);
|
||||
if (ret != 0) {
|
||||
vError("vgId:%d, failed to preprocess arb check sync request since %s", TD_VID(pVnode), tstrerror(ret));
|
||||
}
|
||||
|
@ -2551,7 +2551,7 @@ static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pR
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
|
||||
static int32_t vnodeCheckToken(SVnode *pVnode, char *member0Token, char *member1Token) {
|
||||
SSyncState syncState = syncGetState(pVnode->sync);
|
||||
if (syncState.state != TAOS_SYNC_STATE_LEADER) {
|
||||
return terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
|
@ -2571,13 +2571,13 @@ static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
|
||||
int32_t code = vnodePreCheckAssignedLogSyncd(pVnode, member0Token, member1Token);
|
||||
static int32_t vnodeCheckSyncd(SVnode *pVnode, char *member0Token, char *member1Token) {
|
||||
int32_t code = vnodeCheckToken(pVnode, member0Token, member1Token);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return syncGetAssignedLogSynced(pVnode->sync);
|
||||
return syncCheckSynced(pVnode->sync);
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
|
@ -2601,7 +2601,7 @@ static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t l
|
|||
syncRsp.member1Token = syncReq.member1Token;
|
||||
syncRsp.vgId = TD_VID(pVnode);
|
||||
|
||||
if (vnodeCheckAssignedLogSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) {
|
||||
if (vnodeCheckSyncd(pVnode, syncReq.member0Token, syncReq.member1Token) != 0) {
|
||||
vError("vgId:%d, failed to check assigned log syncd", TD_VID(pVnode));
|
||||
}
|
||||
syncRsp.errCode = terrno;
|
||||
|
|
|
@ -697,7 +697,7 @@ int32_t syncGetArbToken(int64_t rid, char* outToken) {
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t syncGetAssignedLogSynced(int64_t rid) {
|
||||
int32_t syncCheckSynced(int64_t rid) {
|
||||
int32_t code = 0;
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
import taos
|
||||
import sys
|
||||
import os
|
||||
import subprocess
|
||||
import glob
|
||||
import shutil
|
||||
import time
|
||||
|
||||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.srvCtl import *
|
||||
from frame.caseBase import *
|
||||
from frame import *
|
||||
from frame.autogen import *
|
||||
from frame import epath
|
||||
# from frame.server.dnodes import *
|
||||
# from frame.server.cluster import *
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
updatecfgDict = {'dDebugFlag':131}
|
||||
super(TDTestCase, self).init(conn, logSql, replicaVar=1, checkColName="c1")
|
||||
|
||||
self.valgrind = 0
|
||||
self.db = "test"
|
||||
self.stb = "meters"
|
||||
self.childtable_count = 10
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.execute('CREATE DATABASE db vgroups 1 replica 2;')
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
count = 0
|
||||
|
||||
while count < 100:
|
||||
tdSql.query("show arbgroups;")
|
||||
|
||||
if tdSql.getData(0, 4) == True:
|
||||
break
|
||||
|
||||
tdLog.info("wait %d seconds for is sync"%count)
|
||||
time.sleep(1)
|
||||
|
||||
count += 1
|
||||
|
||||
if count == 100:
|
||||
tdLog.exit("arbgroup sync failed")
|
||||
return
|
||||
|
||||
tdSql.query("show db.vgroups;")
|
||||
|
||||
if(tdSql.getData(0, 4) == "follower") and (tdSql.getData(0, 6) == "leader"):
|
||||
tdLog.info("stop dnode2")
|
||||
sc.dnodeStop(2)
|
||||
|
||||
if(tdSql.getData(0, 6) == "follower") and (tdSql.getData(0, 4) == "leader"):
|
||||
tdLog.info("stop dnode 3")
|
||||
sc.dnodeStop(3)
|
||||
|
||||
|
||||
count = 0
|
||||
while count < 100:
|
||||
tdSql.query("show db.vgroups;")
|
||||
|
||||
if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "):
|
||||
break
|
||||
|
||||
tdLog.info("wait %d seconds for set assigned"%count)
|
||||
time.sleep(1)
|
||||
|
||||
count += 1
|
||||
|
||||
if count == 100:
|
||||
tdLog.exit("check assigned failed")
|
||||
return
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -46,13 +46,17 @@ class TDTestCase(TBase):
|
|||
while count < 100:
|
||||
tdSql.query("show arbgroups;")
|
||||
|
||||
if tdSql.getData(0, 4) == 1:
|
||||
if tdSql.getData(0, 4) == True:
|
||||
break
|
||||
|
||||
tdLog.info("wait 1 seconds for is sync")
|
||||
tdLog.info("wait %d seconds for is sync"%count)
|
||||
time.sleep(1)
|
||||
|
||||
count += 1
|
||||
|
||||
if count == 100:
|
||||
tdLog.exit("arbgroup sync failed")
|
||||
return
|
||||
|
||||
|
||||
tdSql.query("show db.vgroups;")
|
||||
|
@ -73,10 +77,14 @@ class TDTestCase(TBase):
|
|||
if(tdSql.getData(0, 4) == "assigned ") or (tdSql.getData(0, 6) == "assigned "):
|
||||
break
|
||||
|
||||
tdLog.info("wait 1 seconds for set assigned")
|
||||
tdLog.info("wait %d seconds for set assigned"%count)
|
||||
time.sleep(1)
|
||||
|
||||
count += 1
|
||||
|
||||
if count == 100:
|
||||
tdLog.exit("check assigned failed")
|
||||
return
|
||||
|
||||
tdSql.execute("INSERT INTO d0 VALUES (NOW, 10.3, 219, 0.31);")
|
||||
|
||||
|
|
|
@ -222,7 +222,7 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||
tdLog.info(len(tdSql.queryResult))
|
||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(312, 313))
|
||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(313, 314))
|
||||
|
||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||
tdSql.checkEqual(61, len(tdSql.queryResult))
|
||||
|
|
Loading…
Reference in New Issue