Merge pull request #25879 from taosdata/enh/TD-29713-3.0
enh: batch update arbgroup in trans
This commit is contained in:
commit
eac6408339
|
@ -2358,20 +2358,24 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp* pRsp);
|
|||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
char* token;
|
||||
} SMArbUpdateGroupReqMember;
|
||||
} SMArbUpdateGroupMember;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int64_t dbUid;
|
||||
SMArbUpdateGroupReqMember members[2];
|
||||
int8_t isSync;
|
||||
SMArbUpdateGroupReqMember assignedLeader;
|
||||
int64_t version;
|
||||
} SMArbUpdateGroupReq;
|
||||
int32_t vgId;
|
||||
int64_t dbUid;
|
||||
SMArbUpdateGroupMember members[2];
|
||||
int8_t isSync;
|
||||
SMArbUpdateGroupMember assignedLeader;
|
||||
int64_t version;
|
||||
} SMArbUpdateGroup;
|
||||
|
||||
int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
|
||||
int32_t tDeserializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq);
|
||||
void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq* pReq);
|
||||
typedef struct {
|
||||
SArray* updateArray; // SMArbUpdateGroup
|
||||
} SMArbUpdateGroupBatchReq;
|
||||
|
||||
int32_t tSerializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
|
||||
int32_t tDeserializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
|
||||
void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char queryStrId[TSDB_QUERY_ID_LEN];
|
||||
|
|
|
@ -388,7 +388,8 @@
|
|||
TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG)
|
||||
|
||||
|
|
|
@ -6437,21 +6437,28 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp *pRsp) {
|
|||
taosMemoryFreeClear(pRsp->memberToken);
|
||||
}
|
||||
|
||||
int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) {
|
||||
int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1;
|
||||
|
||||
int32_t sz = taosArrayGetSize(pReq->updateArray);
|
||||
if (tEncodeI32(&encoder, sz) < 0) return -1;
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
|
||||
if (tEncodeI32(&encoder, pGroup->vgId) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pGroup->dbUid) < 0) return -1;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
if (tEncodeI32(&encoder, pGroup->members[i].dnodeId) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pGroup->members[i].token) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI8(&encoder, pGroup->isSync) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pGroup->assignedLeader.dnodeId) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pGroup->assignedLeader.token) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
|
||||
}
|
||||
if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, pReq->version) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -6460,23 +6467,34 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou
|
|||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) {
|
||||
int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1;
|
||||
pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
||||
if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1;
|
||||
int32_t sz = 0;
|
||||
if (tDecodeI32(&decoder, &sz) < 0) return -1;
|
||||
|
||||
SArray *updateArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
|
||||
if (!updateArray) return -1;
|
||||
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup group = {0};
|
||||
if (tDecodeI32(&decoder, &group.vgId) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &group.dbUid) < 0) return -1;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
if (tDecodeI32(&decoder, &group.members[i].dnodeId) < 0) return -1;
|
||||
group.members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
||||
if (tDecodeCStrTo(&decoder, group.members[i].token) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI8(&decoder, &group.isSync) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &group.assignedLeader.dnodeId) < 0) return -1;
|
||||
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
||||
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &group.version) < 0) return -1;
|
||||
taosArrayPush(updateArray, &group);
|
||||
}
|
||||
if (tDecodeI8(&decoder, &pReq->isSync) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1;
|
||||
pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
|
||||
if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &pReq->version) < 0) return -1;
|
||||
pReq->updateArray = updateArray;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
|
@ -6484,14 +6502,20 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq *pReq) {
|
||||
if (NULL == pReq) {
|
||||
void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq *pReq) {
|
||||
if (NULL == pReq || NULL == pReq->updateArray) {
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < 2; i++) {
|
||||
taosMemoryFreeClear(pReq->members[i].token);
|
||||
|
||||
int32_t sz = taosArrayGetSize(pReq->updateArray);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
taosMemoryFreeClear(pGroup->members[i].token);
|
||||
}
|
||||
taosMemoryFreeClear(pGroup->assignedLeader.token);
|
||||
}
|
||||
taosMemoryFreeClear(pReq->assignedLeader.token);
|
||||
taosArrayDestroy(pReq->updateArray);
|
||||
}
|
||||
|
||||
// int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
||||
|
|
|
@ -180,7 +180,7 @@ typedef struct {
|
|||
tmsg_t originRpcType;
|
||||
char dbname[TSDB_TABLE_FNAME_LEN];
|
||||
char stbname[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t arbGroupId;
|
||||
SHashObj* arbGroupIds;
|
||||
int32_t startFunc;
|
||||
int32_t stopFunc;
|
||||
int32_t paramLen;
|
||||
|
|
|
@ -78,7 +78,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
|||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
|
||||
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
|
||||
void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId);
|
||||
void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId);
|
||||
void mndTransSetSerial(STrans *pTrans);
|
||||
void mndTransSetParallel(STrans *pTrans);
|
||||
void mndTransSetChangeless(STrans *pTrans);
|
||||
|
|
|
@ -39,10 +39,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
|
|||
|
||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
|
||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
|
||||
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray);
|
||||
|
||||
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
|
||||
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
|
||||
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
|
||||
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
|
||||
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
|
||||
|
@ -68,7 +69,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
|
|||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP, mndProcessArbUpdateGroupReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
|
||||
|
@ -81,9 +82,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
|
|||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
void mndCleanupArbGroup(SMnode *pMnode) {
|
||||
taosHashCleanup(arbUpdateHash);
|
||||
}
|
||||
void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
|
||||
|
||||
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
|
||||
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
||||
|
@ -541,6 +540,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
||||
if (pIter == NULL) break;
|
||||
|
@ -612,40 +613,27 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
|||
SArbGroup newGroup = {0};
|
||||
mndArbGroupDupObj(&arbGroupDup, &newGroup);
|
||||
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
|
||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||
mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId,
|
||||
terrstr());
|
||||
sdbRelease(pSdb, pArbGroup);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, arb pull up set assigned leader to dnodeId:%d", vgId, pMember->info.dnodeId);
|
||||
taosArrayPush(pUpdateArray, &newGroup);
|
||||
|
||||
sdbRelease(pSdb, pArbGroup);
|
||||
}
|
||||
|
||||
(void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
|
||||
|
||||
taosArrayDestroy(pUpdateArray);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) {
|
||||
SMArbUpdateGroupReq req = {0};
|
||||
req.vgId = pNewGroup->vgId;
|
||||
req.dbUid = pNewGroup->dbUid;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId;
|
||||
req.members[i].token = pNewGroup->members[i].state.token;
|
||||
}
|
||||
req.isSync = pNewGroup->isSync;
|
||||
req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId;
|
||||
req.assignedLeader.token = pNewGroup->assignedLeader.token;
|
||||
req.version = pNewGroup->version;
|
||||
static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
|
||||
SMArbUpdateGroupBatchReq req = {0};
|
||||
req.updateArray = updateArray;
|
||||
|
||||
int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req);
|
||||
int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
|
||||
if (contLen <= 0) return NULL;
|
||||
SMsgHead *pHead = rpcMallocCont(contLen);
|
||||
if (pHead == NULL) return NULL;
|
||||
|
||||
if (tSerializeSMArbUpdateGroupReq(pHead, contLen, &req) <= 0) {
|
||||
if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
|
||||
rpcFreeCont(pHead);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -653,60 +641,172 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
|
|||
return pHead;
|
||||
}
|
||||
|
||||
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
|
||||
outGroup->vgId = pGroup->vgId;
|
||||
outGroup->dbUid = pGroup->dbUid;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
|
||||
outGroup->members[i].token = pGroup->members[i].state.token; // just copy the pointer
|
||||
}
|
||||
outGroup->isSync = pGroup->isSync;
|
||||
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
|
||||
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
|
||||
outGroup->version = pGroup->version;
|
||||
}
|
||||
|
||||
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
|
||||
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
|
||||
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
|
||||
if (!pHead) {
|
||||
mError("vgId:%d, failed to build arb-update-group request", pNewGroup->vgId);
|
||||
return -1;
|
||||
}
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
|
||||
int32_t ret = -1;
|
||||
|
||||
int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
if (ret == 0) {
|
||||
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
|
||||
SMArbUpdateGroup newGroup = {0};
|
||||
mndInitArbUpdateGroup(pNewGroup, &newGroup);
|
||||
|
||||
SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
|
||||
taosArrayPush(pArray, &newGroup);
|
||||
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
|
||||
if (!pHead) {
|
||||
mError("failed to build arb-update-group request");
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
|
||||
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
if (ret != 0) goto _OVER;
|
||||
|
||||
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
|
||||
|
||||
_OVER:
|
||||
taosArrayDestroy(pArray);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
|
||||
int ret = 0;
|
||||
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) {
|
||||
int32_t ret = -1;
|
||||
|
||||
SMArbUpdateGroupReq req = {0};
|
||||
tDeserializeSMArbUpdateGroupReq(pReq->pCont, pReq->contLen, &req);
|
||||
size_t sz = taosArrayGetSize(newGroupArray);
|
||||
SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
|
||||
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
|
||||
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
|
||||
continue;
|
||||
}
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
newGroup.vgId = req.vgId;
|
||||
newGroup.dbUid = req.dbUid;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
newGroup.members[i].info.dnodeId = req.members[i].dnodeId;
|
||||
memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE);
|
||||
SMArbUpdateGroup newGroup = {0};
|
||||
mndInitArbUpdateGroup(pNewGroup, &newGroup);
|
||||
|
||||
taosArrayPush(pArray, &newGroup);
|
||||
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
|
||||
}
|
||||
|
||||
newGroup.isSync = req.isSync;
|
||||
newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId;
|
||||
memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
newGroup.version = req.version;
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
||||
if (!pOldGroup) {
|
||||
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
|
||||
return 0;
|
||||
}
|
||||
sdbRelease(pMnode->pSdb, pOldGroup);
|
||||
|
||||
if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
|
||||
mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr());
|
||||
ret = -1;
|
||||
if (taosArrayGetSize(pArray) == 0) {
|
||||
ret = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
tFreeSMArbUpdateGroupReq(&req);
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
|
||||
if (!pHead) {
|
||||
mError("failed to build arb-update-group request");
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
|
||||
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
|
||||
_OVER:
|
||||
taosArrayDestroy(pArray);
|
||||
|
||||
if (ret != 0) {
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
|
||||
taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
||||
int ret = -1;
|
||||
size_t sz = 0;
|
||||
|
||||
SMArbUpdateGroupBatchReq req = {0};
|
||||
if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) {
|
||||
mError("arb failed to decode arb-update-group request");
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to update arbgroup in create trans, since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
sz = taosArrayGetSize(req.updateArray);
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
|
||||
SArbGroup newGroup = {0};
|
||||
newGroup.vgId = pUpdateGroup->vgId;
|
||||
newGroup.dbUid = pUpdateGroup->dbUid;
|
||||
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
|
||||
newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
|
||||
memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
|
||||
}
|
||||
|
||||
newGroup.isSync = pUpdateGroup->isSync;
|
||||
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
|
||||
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
newGroup.version = pUpdateGroup->version;
|
||||
|
||||
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
||||
if (!pOldGroup) {
|
||||
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
|
||||
taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t));
|
||||
continue;
|
||||
}
|
||||
|
||||
mndTransAddArbGroupId(pTrans, newGroup.vgId);
|
||||
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
|
||||
terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
|
||||
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
|
||||
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
|
||||
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token);
|
||||
|
||||
sdbRelease(pMnode->pSdb, pOldGroup);
|
||||
}
|
||||
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
ret = 0;
|
||||
|
||||
_OVER:
|
||||
if (ret != 0) {
|
||||
// failed to update arbgroup
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
|
||||
taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t));
|
||||
}
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
tFreeSMArbUpdateGroupBatchReq(&req);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -739,7 +839,7 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
|||
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
|
||||
pNew->assignedLeader.token);
|
||||
|
||||
mndTransSetArbGroupId(pTrans, pNew->vgId);
|
||||
mndTransAddArbGroupId(pTrans, pNew->vgId);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
ret = -1;
|
||||
goto _OVER;
|
||||
|
@ -816,10 +916,10 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
|
||||
int ret = 0;
|
||||
int64_t nowMs = taosGetTimestampMs();
|
||||
size_t size = taosArrayGetSize(memberArray);
|
||||
SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
|
||||
|
||||
size_t size = taosArrayGetSize(memberArray);
|
||||
for (size_t i = 0; i < size; i++) {
|
||||
SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
|
||||
|
||||
|
@ -832,17 +932,16 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
|
|||
|
||||
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
|
||||
if (updateToken) {
|
||||
ret = mndPullupArbUpdateGroup(pMnode, &newGroup);
|
||||
if (ret != 0) {
|
||||
mInfo("failed to pullup update arb token, vgId:%d, since %s", pRspMember->vgId, terrstr());
|
||||
}
|
||||
taosArrayPush(pUpdateArray, &newGroup);
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pGroup);
|
||||
if (ret != 0) break;
|
||||
}
|
||||
|
||||
return ret;
|
||||
(void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
|
||||
|
||||
taosArrayDestroy(pUpdateArray);
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
|
||||
|
@ -900,6 +999,11 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
||||
if (pRsp->contLen == 0) {
|
||||
mDebug("arb hb-rsp contLen is 0");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
|
@ -914,6 +1018,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
|||
|
||||
SVArbHeartBeatRsp arbHbRsp = {0};
|
||||
if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) {
|
||||
mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
@ -934,6 +1039,11 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||
if (pRsp->contLen == 0) {
|
||||
mDebug("arb check-sync-rsp contLen is 0");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
|
@ -948,7 +1058,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
|
||||
SVArbCheckSyncRsp syncRsp = {0};
|
||||
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
|
||||
mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code));
|
||||
mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
|
@ -1008,6 +1118,11 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
||||
if (pRsp->contLen == 0) {
|
||||
mDebug("arb set-assigned-rsp contLen is 0");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
|
@ -1022,8 +1137,8 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
|||
|
||||
SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
|
||||
if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) {
|
||||
mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
mInfo("arb set assigned failed, des failed since:%s", tstrerror(pRsp->code));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
#define TRANS_VER1_NUMBER 1
|
||||
#define TRANS_VER2_NUMBER 2
|
||||
#define TRANS_ARRAY_SIZE 8
|
||||
#define TRANS_RESERVE_SIZE 48
|
||||
#define TRANS_RESERVE_SIZE 44
|
||||
|
||||
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
|
||||
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
|
||||
|
@ -196,10 +196,21 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
|
|||
}
|
||||
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER)
|
||||
|
||||
int32_t arbGroupNum = taosHashGetSize(pTrans->arbGroupIds);
|
||||
SDB_SET_INT32(pRaw, dataPos, arbGroupNum, _OVER)
|
||||
void *pIter = NULL;
|
||||
pIter = taosHashIterate(pTrans->arbGroupIds, NULL);
|
||||
while (pIter) {
|
||||
int32_t arbGroupId = *(int32_t *)pIter;
|
||||
SDB_SET_INT32(pRaw, dataPos, arbGroupId, _OVER)
|
||||
pIter = taosHashIterate(pTrans->arbGroupIds, pIter);
|
||||
}
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
terrno = 0;
|
||||
|
||||
_OVER:
|
||||
if (terrno != 0) {
|
||||
|
@ -279,6 +290,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
|
|||
int32_t undoActionNum = 0;
|
||||
int32_t commitActionNum = 0;
|
||||
int32_t dataPos = 0;
|
||||
int32_t arbgroupIdNum = 0;
|
||||
|
||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
|
||||
|
||||
|
@ -350,6 +362,16 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
|
|||
}
|
||||
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER);
|
||||
|
||||
pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||
|
||||
SDB_GET_INT32(pRaw, dataPos, &arbgroupIdNum, _OVER)
|
||||
for (int32_t i = 0; i < arbgroupIdNum; ++i) {
|
||||
int32_t arbGroupId = 0;
|
||||
SDB_GET_INT32(pRaw, dataPos, &arbGroupId, _OVER)
|
||||
taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0);
|
||||
}
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
@ -462,6 +484,9 @@ void mndTransDropData(STrans *pTrans) {
|
|||
mndTransDropActions(pTrans->commitActions);
|
||||
pTrans->commitActions = NULL;
|
||||
}
|
||||
if (pTrans->arbGroupIds != NULL) {
|
||||
taosHashCleanup(pTrans->arbGroupIds);
|
||||
}
|
||||
if (pTrans->pRpcArray != NULL) {
|
||||
taosArrayDestroy(pTrans->pRpcArray);
|
||||
pTrans->pRpcArray = NULL;
|
||||
|
@ -581,6 +606,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
|
|||
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||
pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
||||
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64();
|
||||
taosInitRWLatch(&pTrans->lockRpcArray);
|
||||
|
@ -733,7 +759,9 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname)
|
|||
}
|
||||
}
|
||||
|
||||
void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId) { pTrans->arbGroupId = groupId; }
|
||||
void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) {
|
||||
taosHashPut(pTrans->arbGroupIds, &groupId, sizeof(int32_t), NULL, 0);
|
||||
}
|
||||
|
||||
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
|
||||
|
||||
|
@ -821,7 +849,16 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|||
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
|
||||
if (pNew->arbGroupId == pTrans->arbGroupId) conflict = true;
|
||||
void *pIter = taosHashIterate(pNew->arbGroupIds, NULL);
|
||||
while (pIter != NULL) {
|
||||
int32_t groupId = *(int32_t *)pIter;
|
||||
if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
|
||||
taosHashCancelIterate(pNew->arbGroupIds, pIter);
|
||||
conflict = true;
|
||||
break;
|
||||
}
|
||||
pIter = taosHashIterate(pNew->arbGroupIds, pIter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue