fix/TS-5805-check-arb-updatetime
This commit is contained in:
parent
0da606c7fb
commit
d3ab816838
|
@ -2608,6 +2608,8 @@ typedef struct {
|
|||
int8_t assignedAcked;
|
||||
SMArbUpdateGroupAssigned assignedLeader;
|
||||
int64_t version;
|
||||
int32_t code;
|
||||
int64_t updateTimeMs;
|
||||
} SMArbUpdateGroup;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -8295,6 +8295,8 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat
|
|||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pGroup->assignedLeader.dnodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pGroup->assignedLeader.token));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pGroup->version));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pGroup->code));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pGroup->updateTimeMs));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -8348,6 +8350,8 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd
|
|||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, group.assignedLeader.token));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &group.version));
|
||||
group.assignedLeader.acked = false;
|
||||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &group.code));
|
||||
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &group.updateTimeMs));
|
||||
|
||||
if (taosArrayPush(updateArray, &group) == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
|
|
|
@ -309,6 +309,7 @@ typedef struct {
|
|||
SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM];
|
||||
int8_t isSync;
|
||||
int32_t code;
|
||||
int64_t updateTimeMs;
|
||||
SArbAssignedLeader assignedLeader;
|
||||
int64_t version;
|
||||
|
||||
|
|
|
@ -138,6 +138,8 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pGroup->code, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pGroup->updateTimeMs, _OVER)
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
|
||||
|
||||
|
@ -194,6 +196,8 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pGroup->code, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pGroup->updateTimeMs, _OVER)
|
||||
|
||||
pGroup->mutexInited = false;
|
||||
|
||||
|
@ -249,6 +253,8 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
|||
tstrncpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
|
||||
pOld->version++;
|
||||
pOld->code = pNew->code;
|
||||
pOld->updateTimeMs = pNew->updateTimeMs;
|
||||
|
||||
mInfo(
|
||||
"arbgroup:%d, perform update action. members[0].token:%s, members[1].token:%s, isSync:%d, as-dnodeid:%d, "
|
||||
|
@ -762,6 +768,8 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup)
|
|||
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
|
||||
outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
|
||||
outGroup->version = pGroup->version;
|
||||
outGroup->code = pGroup->code;
|
||||
outGroup->updateTimeMs = pGroup->updateTimeMs;
|
||||
}
|
||||
|
||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||
|
@ -858,7 +866,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "upd-bat-arbgroup");
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to update arbgroup in create trans, since %s", terrstr());
|
||||
goto _OVER;
|
||||
|
@ -880,6 +888,16 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
tstrncpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
|
||||
newGroup.version = pUpdateGroup->version;
|
||||
newGroup.code = pUpdateGroup->code;
|
||||
newGroup.updateTimeMs = pUpdateGroup->updateTimeMs;
|
||||
|
||||
mInfo(
|
||||
"trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d], %d, "
|
||||
"%" PRId64,
|
||||
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
|
||||
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
|
||||
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked,
|
||||
pUpdateGroup->code, pUpdateGroup->updateTimeMs);
|
||||
|
||||
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
||||
if (!pOldGroup) {
|
||||
|
@ -1090,8 +1108,9 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0
|
|||
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||
pNewGroup->isSync = newIsSync;
|
||||
pNewGroup->code = code;
|
||||
pNewGroup->updateTimeMs = taosGetTimestampMs();
|
||||
|
||||
mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync);
|
||||
mInfo("vgId:%d, arb isSync updating, new isSync:%d, timeStamp:%" PRId64, vgId, newIsSync, pNewGroup->updateTimeMs);
|
||||
updateIsSync = true;
|
||||
}
|
||||
|
||||
|
@ -1189,7 +1208,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, arb check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
|
||||
mInfo("vgId:%d, vnode-arb-check-sync-rsp received, errCode:%d", syncRsp.vgId, syncRsp.errCode);
|
||||
if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
|
||||
mInfo("skip update arb sync for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", syncRsp.vgId, arbToken,
|
||||
syncRsp.arbToken);
|
||||
|
@ -1297,6 +1316,35 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static char *formatTimestamp(char *buf, int64_t val, int precision) {
|
||||
time_t tt;
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
tt = (time_t)(val / 1000000);
|
||||
}
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
tt = (time_t)(val / 1000000000);
|
||||
} else {
|
||||
tt = (time_t)(val / 1000);
|
||||
}
|
||||
|
||||
struct tm tm;
|
||||
if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
|
||||
mError("failed to get local time");
|
||||
return NULL;
|
||||
}
|
||||
size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
sprintf(buf + pos, ".%06d", (int)(val % 1000000));
|
||||
} else if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
|
||||
} else {
|
||||
sprintf(buf + pos, ".%03d", (int)(val % 1000));
|
||||
}
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
@ -1338,8 +1386,13 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
|||
&lino, _OVER);
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, arb group sync:%d, code:%d, update time:%" PRId64, pGroup->vgId, pGroup->isSync, pGroup->code,
|
||||
pGroup->updateTimeMs);
|
||||
|
||||
char strSync[100] = {0};
|
||||
sprintf(strSync, "%d,%d", pGroup->isSync, pGroup->code);
|
||||
char bufUpdateTime[40] = {0};
|
||||
(void)formatTimestamp(bufUpdateTime, pGroup->updateTimeMs, TSDB_TIME_PRECISION_MILLI);
|
||||
sprintf(strSync, "%d,%d,%s", pGroup->isSync, pGroup->code, bufUpdateTime);
|
||||
char sync[100 + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(sync, strSync, 100 + VARSTR_HEADER_SIZE);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
|
|
Loading…
Reference in New Issue