fix/TD-30989
This commit is contained in:
parent
7295aa2f0c
commit
fce5174d10
|
@ -104,7 +104,7 @@ void mndReleaseArbGroup(SMnode *pMnode, SArbGroup *pGroup) {
|
||||||
|
|
||||||
void mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
|
void mndArbGroupInitFromVgObj(SVgObj *pVgObj, SArbGroup *outGroup) {
|
||||||
ASSERT(pVgObj->replica == 2);
|
ASSERT(pVgObj->replica == 2);
|
||||||
memset(outGroup, 0, sizeof(SArbGroup));
|
(void)memset(outGroup, 0, sizeof(SArbGroup));
|
||||||
outGroup->dbUid = pVgObj->dbUid;
|
outGroup->dbUid = pVgObj->dbUid;
|
||||||
outGroup->vgId = pVgObj->vgId;
|
outGroup->vgId = pVgObj->vgId;
|
||||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
|
@ -223,7 +223,7 @@ static int32_t mndArbGroupActionInsert(SSdb *pSdb, SArbGroup *pGroup) {
|
||||||
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
|
static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
|
||||||
mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
|
mTrace("arbgroup:%d, perform delete action, row:%p", pGroup->vgId, pGroup);
|
||||||
if (pGroup->mutexInited) {
|
if (pGroup->mutexInited) {
|
||||||
taosThreadMutexDestroy(&pGroup->mutex);
|
(void)taosThreadMutexDestroy(&pGroup->mutex);
|
||||||
pGroup->mutexInited = false;
|
pGroup->mutexInited = false;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -231,7 +231,7 @@ static int32_t mndArbGroupActionDelete(SSdb *pSdb, SArbGroup *pGroup) {
|
||||||
|
|
||||||
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
|
static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *pNew) {
|
||||||
mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
|
mTrace("arbgroup:%d, perform update action, old row:%p new row:%p", pOld->vgId, pOld, pNew);
|
||||||
taosThreadMutexLock(&pOld->mutex);
|
(void)taosThreadMutexLock(&pOld->mutex);
|
||||||
|
|
||||||
if (pOld->version != pNew->version) {
|
if (pOld->version != pNew->version) {
|
||||||
mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
|
mInfo("arbgroup:%d, skip to perform update action, old row:%p new row:%p, old version:%" PRId64
|
||||||
|
@ -241,18 +241,18 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
memcpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
|
(void)memcpy(pOld->members[i].state.token, pNew->members[i].state.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
}
|
}
|
||||||
pOld->isSync = pNew->isSync;
|
pOld->isSync = pNew->isSync;
|
||||||
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
|
||||||
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
(void)memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
|
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
|
||||||
pOld->version++;
|
pOld->version++;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
taosThreadMutexUnlock(&pOld->mutex);
|
(void)taosThreadMutexUnlock(&pOld->mutex);
|
||||||
|
|
||||||
taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t));
|
(void)taosHashRemove(arbUpdateHash, &pOld->vgId, sizeof(int32_t));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -389,7 +389,7 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
||||||
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
taosThreadMutexLock(&pArbGroup->mutex);
|
(void)taosThreadMutexLock(&pArbGroup->mutex);
|
||||||
|
|
||||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
SArbGroupMember *pMember = &pArbGroup->members[i];
|
SArbGroupMember *pMember = &pArbGroup->members[i];
|
||||||
|
@ -400,13 +400,13 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
||||||
hbMembers = *(SArray **)pObj;
|
hbMembers = *(SArray **)pObj;
|
||||||
} else {
|
} else {
|
||||||
hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
|
hbMembers = taosArrayInit(16, sizeof(SVArbHbReqMember));
|
||||||
taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES);
|
(void)taosHashPut(pDnodeHash, &dnodeId, sizeof(int32_t), &hbMembers, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
|
SVArbHbReqMember reqMember = {.vgId = pArbGroup->vgId, .hbSeq = pMember->state.nextHbSeq++};
|
||||||
taosArrayPush(hbMembers, &reqMember);
|
(void)taosArrayPush(hbMembers, &reqMember);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pArbGroup->mutex);
|
(void)taosThreadMutexUnlock(&pArbGroup->mutex);
|
||||||
sdbRelease(pSdb, pArbGroup);
|
sdbRelease(pSdb, pArbGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -602,9 +602,9 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
taosThreadMutexLock(&pArbGroup->mutex);
|
(void)taosThreadMutexLock(&pArbGroup->mutex);
|
||||||
mndArbGroupDupObj(pArbGroup, &arbGroupDup);
|
mndArbGroupDupObj(pArbGroup, &arbGroupDup);
|
||||||
taosThreadMutexUnlock(&pArbGroup->mutex);
|
(void)taosThreadMutexUnlock(&pArbGroup->mutex);
|
||||||
|
|
||||||
int32_t vgId = arbGroupDup.vgId;
|
int32_t vgId = arbGroupDup.vgId;
|
||||||
|
|
||||||
|
@ -668,7 +668,10 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||||
SArbGroup newGroup = {0};
|
SArbGroup newGroup = {0};
|
||||||
mndArbGroupDupObj(&arbGroupDup, &newGroup);
|
mndArbGroupDupObj(&arbGroupDup, &newGroup);
|
||||||
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
|
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
|
||||||
taosArrayPush(pUpdateArray, &newGroup);
|
if (taosArrayPush(pUpdateArray, &newGroup)) {
|
||||||
|
taosArrayDestroy(pUpdateArray);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pArbGroup);
|
sdbRelease(pSdb, pArbGroup);
|
||||||
}
|
}
|
||||||
|
@ -722,7 +725,7 @@ static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||||
mndInitArbUpdateGroup(pNewGroup, &newGroup);
|
mndInitArbUpdateGroup(pNewGroup, &newGroup);
|
||||||
|
|
||||||
SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
|
SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
|
||||||
taosArrayPush(pArray, &newGroup);
|
if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
|
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
|
||||||
|
@ -736,7 +739,7 @@ static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||||
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
if (ret != 0) goto _OVER;
|
if (ret != 0) goto _OVER;
|
||||||
|
|
||||||
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
|
if ((ret = taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0)) != 0) goto _OVER;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
|
@ -758,8 +761,8 @@ static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArra
|
||||||
SMArbUpdateGroup newGroup = {0};
|
SMArbUpdateGroup newGroup = {0};
|
||||||
mndInitArbUpdateGroup(pNewGroup, &newGroup);
|
mndInitArbUpdateGroup(pNewGroup, &newGroup);
|
||||||
|
|
||||||
taosArrayPush(pArray, &newGroup);
|
if (taosArrayPush(pArray, &newGroup) == NULL) goto _OVER;
|
||||||
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
|
if (taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0) != 0) goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pArray) == 0) {
|
if (taosArrayGetSize(pArray) == 0) {
|
||||||
|
@ -784,7 +787,7 @@ _OVER:
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
|
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
|
||||||
taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId));
|
(void)taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -816,19 +819,19 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
||||||
newGroup.dbUid = pUpdateGroup->dbUid;
|
newGroup.dbUid = pUpdateGroup->dbUid;
|
||||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
|
newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
|
||||||
memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
|
(void)memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
newGroup.isSync = pUpdateGroup->isSync;
|
newGroup.isSync = pUpdateGroup->isSync;
|
||||||
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
|
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
|
||||||
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
(void)memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
|
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
|
||||||
newGroup.version = pUpdateGroup->version;
|
newGroup.version = pUpdateGroup->version;
|
||||||
|
|
||||||
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
||||||
if (!pOldGroup) {
|
if (!pOldGroup) {
|
||||||
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
|
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
|
||||||
taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t));
|
(void)taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -858,7 +861,7 @@ _OVER:
|
||||||
// failed to update arbgroup
|
// failed to update arbgroup
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
|
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
|
||||||
taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t));
|
(void)taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -868,20 +871,20 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
|
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
|
||||||
memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
|
(void)memcpy(pNew, pGroup, offsetof(SArbGroup, mutexInited));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
|
static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
|
||||||
SArbGroupMember *pMember = &pGroup->members[index];
|
SArbGroupMember *pMember = &pGroup->members[index];
|
||||||
|
|
||||||
pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
|
pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
|
||||||
strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
|
(void)strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
|
||||||
pGroup->assignedLeader.acked = false;
|
pGroup->assignedLeader.acked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
||||||
pGroup->assignedLeader.dnodeId = 0;
|
pGroup->assignedLeader.dnodeId = 0;
|
||||||
memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
|
(void)memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
|
||||||
pGroup->assignedLeader.acked = false;
|
pGroup->assignedLeader.acked = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -923,7 +926,7 @@ bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMembe
|
||||||
bool updateToken = false;
|
bool updateToken = false;
|
||||||
SArbGroupMember *pMember = NULL;
|
SArbGroupMember *pMember = NULL;
|
||||||
|
|
||||||
taosThreadMutexLock(&pGroup->mutex);
|
(void)taosThreadMutexLock(&pGroup->mutex);
|
||||||
|
|
||||||
int index = 0;
|
int index = 0;
|
||||||
for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
|
for (; index < TSDB_ARB_GROUP_MEMBER_NUM; index++) {
|
||||||
|
@ -957,7 +960,7 @@ bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMembe
|
||||||
|
|
||||||
// update token
|
// update token
|
||||||
mndArbGroupDupObj(pGroup, pNewGroup);
|
mndArbGroupDupObj(pGroup, pNewGroup);
|
||||||
memcpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
|
(void)memcpy(pNewGroup->members[index].state.token, pRspMember->memberToken, TSDB_ARB_TOKEN_SIZE);
|
||||||
pNewGroup->isSync = false;
|
pNewGroup->isSync = false;
|
||||||
|
|
||||||
bool resetAssigned = false;
|
bool resetAssigned = false;
|
||||||
|
@ -970,7 +973,7 @@ bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMembe
|
||||||
mInfo("dnodeId:%d vgId:%d, arb token updating, resetAssigned:%d", dnodeId, pRspMember->vgId, resetAssigned);
|
mInfo("dnodeId:%d vgId:%d, arb token updating, resetAssigned:%d", dnodeId, pRspMember->vgId, resetAssigned);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
taosThreadMutexUnlock(&pGroup->mutex);
|
(void)taosThreadMutexUnlock(&pGroup->mutex);
|
||||||
return updateToken;
|
return updateToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -991,7 +994,7 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
|
||||||
|
|
||||||
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
|
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
|
||||||
if (updateToken) {
|
if (updateToken) {
|
||||||
taosArrayPush(pUpdateArray, &newGroup);
|
(void)taosArrayPush(pUpdateArray, &newGroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pGroup);
|
sdbRelease(pMnode->pSdb, pGroup);
|
||||||
|
@ -1007,7 +1010,7 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0
|
||||||
bool newIsSync, SArbGroup *pNewGroup) {
|
bool newIsSync, SArbGroup *pNewGroup) {
|
||||||
bool updateIsSync = false;
|
bool updateIsSync = false;
|
||||||
|
|
||||||
taosThreadMutexLock(&pGroup->mutex);
|
(void)taosThreadMutexLock(&pGroup->mutex);
|
||||||
|
|
||||||
if (pGroup->assignedLeader.dnodeId != 0) {
|
if (pGroup->assignedLeader.dnodeId != 0) {
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1033,7 +1036,7 @@ bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0
|
||||||
}
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
taosThreadMutexUnlock(&pGroup->mutex);
|
(void)taosThreadMutexUnlock(&pGroup->mutex);
|
||||||
return updateIsSync;
|
return updateIsSync;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1148,7 +1151,7 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
|
||||||
SArbGroup *pNewGroup) {
|
SArbGroup *pNewGroup) {
|
||||||
bool updateAssigned = false;
|
bool updateAssigned = false;
|
||||||
|
|
||||||
taosThreadMutexLock(&pGroup->mutex);
|
(void)taosThreadMutexLock(&pGroup->mutex);
|
||||||
if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
|
if (mndArbCheckToken(pGroup->assignedLeader.token, memberToken) != 0) {
|
||||||
mInfo("skip update arb assigned for vgId:%d, member token mismatch, local:[%s] msg:[%s]", vgId,
|
mInfo("skip update arb assigned for vgId:%d, member token mismatch, local:[%s] msg:[%s]", vgId,
|
||||||
pGroup->assignedLeader.token, memberToken);
|
pGroup->assignedLeader.token, memberToken);
|
||||||
|
@ -1171,7 +1174,7 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
|
||||||
}
|
}
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
taosThreadMutexUnlock(&pGroup->mutex);
|
(void)taosThreadMutexUnlock(&pGroup->mutex);
|
||||||
return updateAssigned;
|
return updateAssigned;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1241,43 +1244,43 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
|
pShow->pIter = sdbFetch(pSdb, SDB_ARBGROUP, pShow->pIter, (void **)&pGroup);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
taosThreadMutexLock(&pGroup->mutex);
|
(void)taosThreadMutexLock(&pGroup->mutex);
|
||||||
|
|
||||||
cols = 0;
|
cols = 0;
|
||||||
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
SVgObj *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
|
SVgObj *pVgObj = sdbAcquire(pSdb, SDB_VGROUP, &pGroup->vgId);
|
||||||
if (!pVgObj) {
|
if (!pVgObj) {
|
||||||
taosThreadMutexUnlock(&pGroup->mutex);
|
(void)taosThreadMutexUnlock(&pGroup->mutex);
|
||||||
sdbRelease(pSdb, pGroup);
|
sdbRelease(pSdb, pGroup);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pVgObj->dbName), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
|
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pVgObj->dbName), TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)dbname, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->vgId, false);
|
||||||
|
|
||||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||||
SArbGroupMember *pMember = &pGroup->members[i];
|
SArbGroupMember *pMember = &pGroup->members[i];
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pMember->info.dnodeId, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->isSync, false);
|
||||||
|
|
||||||
if (pGroup->assignedLeader.dnodeId != 0) {
|
if (pGroup->assignedLeader.dnodeId != 0) {
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.dnodeId, false);
|
||||||
|
|
||||||
char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
|
char token[TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
|
STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)token, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)token, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false);
|
||||||
} else {
|
} else {
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
|
@ -1289,7 +1292,7 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
colDataSetNULL(pColInfo, numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pGroup->mutex);
|
(void)taosThreadMutexUnlock(&pGroup->mutex);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pVgObj);
|
sdbRelease(pSdb, pVgObj);
|
||||||
|
|
|
@ -243,7 +243,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
|
int32_t code = taosGetSystemUUID(clusterObj.name, TSDB_CLUSTER_ID_LEN);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
strcpy(clusterObj.name, "tdengine3.0");
|
(void)strcpy(clusterObj.name, "tdengine3.0");
|
||||||
mError("failed to get name from system, set to default val %s", clusterObj.name);
|
mError("failed to get name from system, set to default val %s", clusterObj.name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,7 +355,7 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
|
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
|
||||||
if (pCluster != NULL) {
|
if (pCluster != NULL) {
|
||||||
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
|
(void)memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
|
||||||
clusterObj.upTime += tsUptimeInterval;
|
clusterObj.upTime += tsUptimeInterval;
|
||||||
mndReleaseCluster(pMnode, pCluster, pIter);
|
mndReleaseCluster(pMnode, pCluster, pIter);
|
||||||
}
|
}
|
||||||
|
@ -420,7 +420,7 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
|
||||||
if (pCluster) mndReleaseCluster(pMnode, pCluster, pIter);
|
if (pCluster) mndReleaseCluster(pMnode, pCluster, pIter);
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
|
(void)memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
|
||||||
mndReleaseCluster(pMnode, pCluster, pIter);
|
mndReleaseCluster(pMnode, pCluster, pIter);
|
||||||
|
|
||||||
if (strncmp(cfgReq.config, GRANT_ACTIVE_CODE, TSDB_DNODE_CONFIG_LEN) == 0) {
|
if (strncmp(cfgReq.config, GRANT_ACTIVE_CODE, TSDB_DNODE_CONFIG_LEN) == 0) {
|
||||||
|
|
|
@ -231,7 +231,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj *pCompac
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
pCompact->compactId = tGenIdPI32();
|
pCompact->compactId = tGenIdPI32();
|
||||||
|
|
||||||
strcpy(pCompact->dbname, pDb->name);
|
(void)strcpy(pCompact->dbname, pDb->name);
|
||||||
|
|
||||||
pCompact->startTime = taosGetTimestampMs();
|
pCompact->startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
|
@ -283,21 +283,21 @@ int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock,
|
||||||
char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
|
if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) {
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB);
|
(void)tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB);
|
||||||
tNameGetDbName(&name, varDataVal(tmpBuf));
|
(void)tNameGetDbName(&name, varDataVal(tmpBuf));
|
||||||
} else {
|
} else {
|
||||||
strncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
|
(void)strncpy(varDataVal(tmpBuf), pCompact->dbname, TSDB_SHOW_SQL_LEN);
|
||||||
}
|
}
|
||||||
varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
|
varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf)));
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pCompact);
|
sdbRelease(pSdb, pCompact);
|
||||||
|
@ -334,7 +334,11 @@ static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pC
|
||||||
pHead->contLen = htonl(contLen);
|
pHead->contLen = htonl(contLen);
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
|
|
||||||
tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req);
|
contLen = tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req);
|
||||||
|
if (contLen < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
*pContLen = contLen;
|
*pContLen = contLen;
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
@ -478,7 +482,7 @@ int32_t mndProcessKillCompactReq(SRpcMsg *pReq) {
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
char obj[TSDB_INT32_ID_LEN] = {0};
|
char obj[TSDB_INT32_ID_LEN] = {0};
|
||||||
sprintf(obj, "%d", pCompact->compactId);
|
(void)sprintf(obj, "%d", pCompact->compactId);
|
||||||
|
|
||||||
auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen);
|
auditRecord(pReq, pMnode->clusterId, "killCompact", pCompact->dbname, obj, killCompactReq.sql, killCompactReq.sqlLen);
|
||||||
|
|
||||||
|
@ -587,7 +591,11 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
|
||||||
pHead->contLen = htonl(contLen);
|
pHead->contLen = htonl(contLen);
|
||||||
pHead->vgId = htonl(pDetail->vgId);
|
pHead->vgId = htonl(pDetail->vgId);
|
||||||
|
|
||||||
tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req);
|
contLen = tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req);
|
||||||
|
if (contLen < 0) {
|
||||||
|
sdbRelease(pMnode->pSdb, pDetail);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, .contLen = contLen};
|
||||||
|
|
||||||
|
@ -602,7 +610,10 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
|
||||||
|
|
||||||
mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
|
mDebug("compact:%d, send update progress msg to %s", pDetail->compactId, detail);
|
||||||
|
|
||||||
tmsgSendReq(&epSet, &rpcMsg);
|
if (tmsgSendReq(&epSet, &rpcMsg) < 0) {
|
||||||
|
sdbRelease(pMnode->pSdb, pDetail);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pDetail);
|
sdbRelease(pMnode->pSdb, pDetail);
|
||||||
|
@ -806,7 +817,7 @@ void mndCompactPullup(SMnode *pMnode) {
|
||||||
SCompactObj *pCompact = NULL;
|
SCompactObj *pCompact = NULL;
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
|
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
taosArrayPush(pArray, &pCompact->compactId);
|
(void)taosArrayPush(pArray, &pCompact->compactId);
|
||||||
sdbRelease(pSdb, pCompact);
|
sdbRelease(pSdb, pCompact);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,22 +68,22 @@ int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->compactId, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->compactId, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->vgId, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->vgId, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->dnodeId, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->dnodeId, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->numberFileset, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->numberFileset, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->finished, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->finished, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->startTime, false);
|
(void)colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->startTime, false);
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pCompactDetail);
|
sdbRelease(pSdb, pCompactDetail);
|
||||||
|
|
Loading…
Reference in New Issue