diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fe00bfc6bc..fb73a154ce 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2236,6 +2236,7 @@ typedef struct { SMArbUpdateGroupReqMember members[2]; int8_t isSync; SMArbUpdateGroupReqMember assignedLeader; + int64_t version; } SMArbUpdateGroupReq; int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq); diff --git a/include/util/tdef.h b/include/util/tdef.h index 96d974e4e0..fe24e4e6f6 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -279,6 +279,7 @@ typedef enum ELogicConditionType { #define TSDB_SHOW_SUBQUERY_LEN 1000 #define TSDB_LOG_VAR_LEN 32 +#define TSDB_ARB_GROUP_MEMBER_NUM 2 #define TSDB_ARB_TOKEN_SIZE 32 #define TSDB_TRANS_STAGE_LEN 12 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 80184dbd05..3381d52050 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -85,7 +85,7 @@ bool tsEnableWhiteList = false; // ip white list cfg // arbitrator int32_t tsArbHeartBeatIntervalSec = 5; int32_t tsArbCheckSyncIntervalSec = 10; -int32_t tsArbSetAssignedTimeoutSec = 50; +int32_t tsArbSetAssignedTimeoutSec = 30; // dnode int64_t tsDndStart = 0; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6311e9bf12..0d2af37e7d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6222,13 +6222,14 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1; if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1; } if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1; if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1; if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1; + if (tEncodeI64(&encoder, pReq->version) < 0) return -1; tEndEncode(&encoder); @@ -6244,7 +6245,7 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1; pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1; @@ -6253,6 +6254,7 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1; pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->version) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a9bda1ce5c..3f60991c2a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -273,9 +273,10 @@ typedef struct { typedef struct { int32_t vgId; int64_t dbUid; - SArbGroupMember members[2]; + SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM]; int8_t isSync; SArbAssignedLeader assignedLeader; + int64_t version; // following fields will not be duplicated bool mutexInited; diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 0678ee2c98..4cde0d8117 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -36,7 +36,7 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index); static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup); static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew); -static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup* pNewGroup); +static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup); static int32_t mndProcessArbHbTimer(SRpcMsg *pReq); static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq); @@ -97,7 +97,7 @@ void mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) { memset(outGroup, 0, sizeof(SArbGroup)); outGroup->dbUid = pVgObj->dbUid; outGroup->vgId = pVgObj->vgId; - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { SArbGroupMember *pMember = &outGroup->members[i]; pMember->info.dnodeId = pVgObj->vnodeGid[i].dnodeId; } @@ -113,7 +113,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) { int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, pGroup->vgId, _OVER) SDB_SET_INT64(pRaw, dataPos, pGroup->dbUid, _OVER) - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { SArbGroupMember *pMember = &pGroup->members[i]; SDB_SET_INT32(pRaw, dataPos, pMember->info.dnodeId, _OVER) SDB_SET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER) @@ -123,6 +123,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) { SArbAssignedLeader *pLeader = &pGroup->assignedLeader; SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER) SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) + SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER) SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER) @@ -161,7 +162,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; SDB_GET_INT32(pRaw, dataPos, &pGroup->vgId, _OVER) SDB_GET_INT64(pRaw, dataPos, &pGroup->dbUid, _OVER) - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { SArbGroupMember *pMember = &pGroup->members[i]; SDB_GET_INT32(pRaw, dataPos, &pMember->info.dnodeId, _OVER) SDB_GET_BINARY(pRaw, dataPos, pMember->state.token, TSDB_ARB_TOKEN_SIZE, _OVER) @@ -175,6 +176,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) { SArbAssignedLeader *pLeader = &pGroup->assignedLeader; SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER) SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) + SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER) pGroup->mutexInited = false; @@ -214,12 +216,22 @@ static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) { static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) { mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew); taosThreadMutexLock(&pOld->mutex); - for (int i = 0; i < 2; i++) { + + if (pOld->version >= pNew->version) { + mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64 + " new version:%" PRId64, + pOld->vgId, pOld, pNew, pOld->version, pNew->version); + taosThreadMutexUnlock(&pOld->mutex); + return 0; + } + + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { memcpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE); } pOld->isSync = pNew->isSync; pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId; memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + pOld->version = pNew->version; taosThreadMutexUnlock(&pOld->mutex); return 0; } @@ -326,7 +338,7 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) { taosThreadMutexLock(&pArbGroup->mutex); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { SArbGroupMember *pMember = &pArbGroup->members[i]; int32_t dnodeId = pMember->info.dnodeId; void *pObj = taosHashGet(pDnodeHash, &dnodeId, sizeof(int32_t)); @@ -348,6 +360,12 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) { char arbToken[TSDB_ARB_TOKEN_SIZE]; if (mndGetArbToken(pMnode, arbToken) != 0) { mError("failed to get arb token for arb-hb timer"); + pIter = taosHashIterate(pDnodeHash, NULL); + while (pIter) { + SArray *hbMembers = *(SArray **)pIter; + taosArrayDestroy(hbMembers); + pIter = taosHashIterate(pDnodeHash, pIter); + } taosHashCleanup(pDnodeHash); return -1; } @@ -383,8 +401,8 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) { return 0; } -static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm, char *member0Token, - char *member1Token) { +static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbToken, int64_t arbTerm, + char *member0Token, char *member1Token) { SVArbCheckSyncReq req = {0}; req.arbToken = arbToken; req.arbTerm = arbTerm; @@ -501,6 +519,10 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { return -1; } int64_t term = mndGetTerm(pMnode); + if (term < 0) { + mError("arb failed to get term since %s", terrstr()); + return -1; + } while (1) { pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); @@ -527,11 +549,6 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { if (member0IsTimeout == false && member1IsTimeout == false) { // no assigned leader and not sync if (currentAssignedDnodeId == 0 && !arbGroupDup.isSync) { - if (term < 0) { - mError("vgId:%d, arb failed to get term since %s", vgId, terrstr()); - sdbRelease(pSdb, pArbGroup); - return -1; - } (void)mndSendArbCheckSyncReq(pMnode, arbGroupDup.vgId, arbToken, term, arbGroupDup.members[0].state.token, arbGroupDup.members[1].state.token); } @@ -569,6 +586,7 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { SArbGroup newGroup = {0}; mndArbGroupDupObj(&arbGroupDup, &newGroup); mndArbGroupSetAssignedLeader(&newGroup, candidateIndex); + newGroup.version++; if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) { mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId, terrstr()); @@ -595,13 +613,14 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) SMArbUpdateGroupReq req = {0}; req.vgId = pNewGroup->vgId; req.dbUid = pNewGroup->dbUid; - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId; req.members[i].token = pNewGroup->members[i].state.token; } req.isSync = pNewGroup->isSync; req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId; req.assignedLeader.token = pNewGroup->assignedLeader.token; + req.version = pNewGroup->version; int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req); if (contLen <= 0) return NULL; @@ -616,7 +635,7 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) return pHead; } -static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup* pNewGroup) { +static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { int32_t contLen = 0; void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup); if (!pHead) { @@ -637,7 +656,7 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { SArbGroup newGroup = {0}; newGroup.vgId = req.vgId; newGroup.dbUid = req.dbUid; - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { newGroup.members[i].info.dnodeId = req.members[i].dnodeId; memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE); } @@ -645,6 +664,7 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { newGroup.isSync = req.isSync; newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId; memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + newGroup.version = req.version; SMnode *pMnode = pReq->info.node; if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) { @@ -713,7 +733,7 @@ bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMembe taosThreadMutexLock(&pGroup->mutex); int index = 0; - for (; index < 2; index++) { + for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) { pMember = &pGroup->members[index]; if (pMember->info.dnodeId == dnodeId) { break; @@ -746,6 +766,7 @@ bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMembe mndArbGroupDupObj(pGroup, pNewGroup); memcpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE); pNewGroup->isSync = false; + pNewGroup->version++; bool resetAssigned = false; if (pMember->info.dnodeId == pGroup->assignedLeader.dnodeId) { @@ -815,6 +836,7 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0 if (pGroup->isSync != newIsSync) { mndArbGroupDupObj(pGroup, pNewGroup); pNewGroup->isSync = newIsSync; + pNewGroup->version++; mInfo("vgId:%d, arb isSync updating, new isSync:%d", vgId, newIsSync); updateIsSync = true; @@ -846,6 +868,8 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token } static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) { + int32_t ret = -1; + SMnode *pMnode = pRsp->info.node; SSdb *pSdb = pMnode->pSdb; @@ -870,14 +894,16 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) { } (void)mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers); - terrno = TSDB_CODE_SUCCESS; + ret = 0; _OVER: tFreeSVArbHeartBeatRsp(&arbHbRsp); - return terrno == TSDB_CODE_SUCCESS ? 0 : -1; + return ret; } static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { + int32_t ret = -1; + SMnode *pMnode = pRsp->info.node; SSdb *pSdb = pMnode->pSdb; @@ -905,11 +931,14 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS); if (mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync) != 0) { mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr()); + goto _OVER; } + ret = 0; + _OVER: tFreeSVArbCheckSyncRsp(&syncRsp); - return terrno == TSDB_CODE_SUCCESS ? 0 : -1; + return ret; } bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode, @@ -924,17 +953,14 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char } if (errcode != TSDB_CODE_SUCCESS) { - mndArbGroupDupObj(pGroup, pNewGroup); - mndArbGroupResetAssignedLeader(pNewGroup); - - mInfo("vgId:%d, arb resetting assigned leader", vgId); - updateAssigned = true; + mInfo("skip update arb assigned for vgId:%d, since:%s", vgId, tstrerror(errcode)); goto _OVER; } if (pGroup->isSync) { mndArbGroupDupObj(pGroup, pNewGroup); pNewGroup->isSync = false; + pNewGroup->version++; mInfo("vgId:%d, arb isSync is setting to false", vgId); updateAssigned = true; @@ -947,6 +973,8 @@ _OVER: } static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) { + int32_t ret = -1; + SMnode *pMnode = pRsp->info.node; SSdb *pSdb = pMnode->pSdb; @@ -983,12 +1011,15 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) { if (updateAssigned) { if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) { mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr()); + goto _OVER; } } + ret = 0; + _OVER: tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp); - return terrno == TSDB_CODE_SUCCESS ? 0 : -1; + return ret; } static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { @@ -1019,7 +1050,7 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { SArbGroupMember *pMember = &pGroup->members[i]; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d94e651114..c1dafecf92 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -312,12 +312,11 @@ SEpSet mndGetDnodeEpset(SDnodeObj *pDnode) { } SEpSet mndGetDnodeEpsetById(SMnode *pMnode, int32_t dnodeId) { - SEpSet epSet = {0}; - - SDnodeObj* pDnode = mndAcquireDnode(pMnode, dnodeId); + SEpSet epSet = {0}; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); if (!pDnode) return epSet; - addEpIntoEpSet(&epSet, pDnode->fqdn, pDnode->port); + epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); return epSet; diff --git a/source/dnode/mnode/impl/test/arbgroup/arbgroup.cpp b/source/dnode/mnode/impl/test/arbgroup/arbgroup.cpp index 887a410fdd..e74d90aba7 100644 --- a/source/dnode/mnode/impl/test/arbgroup/arbgroup.cpp +++ b/source/dnode/mnode/impl/test/arbgroup/arbgroup.cpp @@ -34,7 +34,7 @@ void generateArbToken(int32_t nodeId, int32_t vgId, char* buf) { memset(buf, 0, TSDB_ARB_TOKEN_SIZE); int32_t randVal = taosSafeRand() % 1000; int64_t currentMs = taosGetTimestampMs(); - sprintf(buf, "d%d#g%d#%" PRId64 "#%d", nodeId, vgId, currentMs, randVal); + snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, vgId, currentMs, randVal); } } // namespace diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index c4f7d68f3b..16739647b5 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -78,6 +78,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { + terrno = TSDB_CODE_SUCCESS; raftStoreNextTerm(ths); if (terrno != TSDB_CODE_SUCCESS) { sError("vgId:%d, failed to update term, reason:%s", ths->vgId, tstrerror(terrno)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ba7d6cb448..2a9b05e354 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -661,6 +661,12 @@ int32_t syncGetAssignedLogSynced(int64_t rid) { return -1; } + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + terrno = TSDB_CODE_VND_ARB_NOT_SYNCED; + syncNodeRelease(pSyncNode); + return 0; + } + bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex; terrno = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index d88a00879e..93a06ddf30 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -107,7 +107,7 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) { memset(buf, 0, TSDB_ARB_TOKEN_SIZE); int32_t randVal = taosSafeRand() % 1000; int64_t currentMs = taosGetTimestampMs(); - sprintf(buf, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal); + snprintf(buf, TSDB_ARB_TOKEN_SIZE, "d%d#g%d#%" PRId64 "#%d", nodeId, groupId, currentMs, randVal); } // for leader