From 307ddc632fd1b5a4c1ee93c1c1f0abdd550829c3 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 21 May 2024 17:10:57 +0800 Subject: [PATCH] enh: batch update arbgroup in trans --- include/common/tmsg.h | 26 ++- include/common/tmsgdef.h | 2 +- source/common/src/tmsg.c | 78 ++++--- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/inc/mndTrans.h | 2 +- source/dnode/mnode/impl/src/mndArbGroup.c | 265 ++++++++++++++++------ source/dnode/mnode/impl/src/mndTrans.c | 45 +++- 7 files changed, 300 insertions(+), 120 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3ed6b40d4d..be2f443140 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2358,20 +2358,24 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp* pRsp); typedef struct { int32_t dnodeId; char* token; -} SMArbUpdateGroupReqMember; +} SMArbUpdateGroupMember; typedef struct { - int32_t vgId; - int64_t dbUid; - SMArbUpdateGroupReqMember members[2]; - int8_t isSync; - SMArbUpdateGroupReqMember assignedLeader; - int64_t version; -} SMArbUpdateGroupReq; + int32_t vgId; + int64_t dbUid; + SMArbUpdateGroupMember members[2]; + int8_t isSync; + SMArbUpdateGroupMember assignedLeader; + int64_t version; +} SMArbUpdateGroup; -int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq); -int32_t tDeserializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq); -void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq* pReq); +typedef struct { + SArray* updateArray; // SMArbUpdateGroup +} SMArbUpdateGroupBatchReq; + +int32_t tSerializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq); +int32_t tDeserializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq); +void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq* pReq); typedef struct { char queryStrId[TSDB_QUERY_ID_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a5a3bd5ee0..fdfbdcc3f8 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -388,7 +388,7 @@ TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8 TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b7d1417451..ef37a41fcf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6437,21 +6437,28 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp *pRsp) { taosMemoryFreeClear(pRsp->memberToken); } -int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) { +int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; - if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1; + + int32_t sz = taosArrayGetSize(pReq->updateArray); + if (tEncodeI32(&encoder, sz) < 0) return -1; + + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i); + if (tEncodeI32(&encoder, pGroup->vgId) < 0) return -1; + if (tEncodeI64(&encoder, pGroup->dbUid) < 0) return -1; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + if (tEncodeI32(&encoder, pGroup->members[i].dnodeId) < 0) return -1; + if (tEncodeCStr(&encoder, pGroup->members[i].token) < 0) return -1; + } + if (tEncodeI8(&encoder, pGroup->isSync) < 0) return -1; + if (tEncodeI32(&encoder, pGroup->assignedLeader.dnodeId) < 0) return -1; + if (tEncodeCStr(&encoder, pGroup->assignedLeader.token) < 0) return -1; + if (tEncodeI64(&encoder, pGroup->version) < 0) return -1; } - if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1; - if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1; - if (tEncodeI64(&encoder, pReq->version) < 0) return -1; tEndEncode(&encoder); @@ -6460,23 +6467,34 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou return tlen; } -int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) { +int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1; - pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); - if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1; + int32_t sz = 0; + if (tDecodeI32(&decoder, &sz) < 0) return -1; + + SArray *updateArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup)); + if (!updateArray) return -1; + + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup group = {0}; + if (tDecodeI32(&decoder, &group.vgId) < 0) return -1; + if (tDecodeI64(&decoder, &group.dbUid) < 0) return -1; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + if (tDecodeI32(&decoder, &group.members[i].dnodeId) < 0) return -1; + group.members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); + if (tDecodeCStrTo(&decoder, group.members[i].token) < 0) return -1; + } + if (tDecodeI8(&decoder, &group.isSync) < 0) return -1; + if (tDecodeI32(&decoder, &group.assignedLeader.dnodeId) < 0) return -1; + group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); + if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1; + if (tDecodeI64(&decoder, &group.version) < 0) return -1; + taosArrayPush(updateArray, &group); } - if (tDecodeI8(&decoder, &pReq->isSync) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1; - pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); - if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->version) < 0) return -1; + pReq->updateArray = updateArray; tEndDecode(&decoder); @@ -6484,14 +6502,20 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr return 0; } -void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq *pReq) { - if (NULL == pReq) { +void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq *pReq) { + if (NULL == pReq || NULL == pReq->updateArray) { return; } - for (int i = 0; i < 2; i++) { - taosMemoryFreeClear(pReq->members[i].token); + + int32_t sz = taosArrayGetSize(pReq->updateArray); + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i); + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + taosMemoryFreeClear(pGroup->members[i].token); + } + taosMemoryFreeClear(pGroup->assignedLeader.token); } - taosMemoryFreeClear(pReq->assignedLeader.token); + taosArrayDestroy(pReq->updateArray); } // int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 68c55e235f..0c40622d08 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -180,7 +180,7 @@ typedef struct { tmsg_t originRpcType; char dbname[TSDB_TABLE_FNAME_LEN]; char stbname[TSDB_TABLE_FNAME_LEN]; - int32_t arbGroupId; + SHashObj* arbGroupIds; int32_t startFunc; int32_t stopFunc; int32_t paramLen; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 8c9ca87fb1..8008eb76e7 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -78,7 +78,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); -void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId); +void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId); void mndTransSetSerial(STrans *pTrans); void mndTransSetParallel(STrans *pTrans); void mndTransSetChangeless(STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index d0a86bdde7..50338fe889 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -39,10 +39,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup); static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew); static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup); +static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray); static int32_t mndProcessArbHbTimer(SRpcMsg *pReq); static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq); -static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq); +static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq); static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp); static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp); static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp); @@ -68,7 +69,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer); mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer); - mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP, mndProcessArbUpdateGroupReq); + mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq); mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp); @@ -81,9 +82,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupArbGroup(SMnode *pMnode) { - taosHashCleanup(arbUpdateHash); -} +void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); } SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) { SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId); @@ -541,6 +540,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { return -1; } + SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup)); + while (1) { pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); if (pIter == NULL) break; @@ -612,40 +613,27 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { SArbGroup newGroup = {0}; mndArbGroupDupObj(&arbGroupDup, &newGroup); mndArbGroupSetAssignedLeader(&newGroup, candidateIndex); - if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) { - mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId, - terrstr()); - sdbRelease(pSdb, pArbGroup); - return -1; - } - - mInfo("vgId:%d, arb pull up set assigned leader to dnodeId:%d", vgId, pMember->info.dnodeId); + taosArrayPush(pUpdateArray, &newGroup); sdbRelease(pSdb, pArbGroup); } + (void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray); + + taosArrayDestroy(pUpdateArray); return 0; } -static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) { - SMArbUpdateGroupReq req = {0}; - req.vgId = pNewGroup->vgId; - req.dbUid = pNewGroup->dbUid; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId; - req.members[i].token = pNewGroup->members[i].state.token; - } - req.isSync = pNewGroup->isSync; - req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId; - req.assignedLeader.token = pNewGroup->assignedLeader.token; - req.version = pNewGroup->version; +static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) { + SMArbUpdateGroupBatchReq req = {0}; + req.updateArray = updateArray; - int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req); + int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req); if (contLen <= 0) return NULL; SMsgHead *pHead = rpcMallocCont(contLen); if (pHead == NULL) return NULL; - if (tSerializeSMArbUpdateGroupReq(pHead, contLen, &req) <= 0) { + if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) { rpcFreeCont(pHead); return NULL; } @@ -653,60 +641,172 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) return pHead; } +static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) { + outGroup->vgId = pGroup->vgId; + outGroup->dbUid = pGroup->dbUid; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId; + outGroup->members[i].token = pGroup->members[i].state.token; // just copy the pointer + } + outGroup->isSync = pGroup->isSync; + outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId; + outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer + outGroup->version = pGroup->version; +} + static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) { mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId); return 0; } - int32_t contLen = 0; - void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup); - if (!pHead) { - mError("vgId:%d, failed to build arb-update-group request", pNewGroup->vgId); - return -1; - } - SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true}; + int32_t ret = -1; - int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - if (ret == 0) { - taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); + SMArbUpdateGroup newGroup = {0}; + mndInitArbUpdateGroup(pNewGroup, &newGroup); + + SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup)); + taosArrayPush(pArray, &newGroup); + + int32_t contLen = 0; + void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray); + if (!pHead) { + mError("failed to build arb-update-group request"); + goto _OVER; } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true}; + ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (ret != 0) goto _OVER; + + taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); + +_OVER: + taosArrayDestroy(pArray); return ret; } -static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { - int ret = 0; +static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) { + int32_t ret = -1; - SMArbUpdateGroupReq req = {0}; - tDeserializeSMArbUpdateGroupReq(pReq->pCont, pReq->contLen, &req); + size_t sz = taosArrayGetSize(newGroupArray); + SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup)); + for (size_t i = 0; i < sz; i++) { + SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i); + if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) { + mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId); + continue; + } - SArbGroup newGroup = {0}; - newGroup.vgId = req.vgId; - newGroup.dbUid = req.dbUid; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - newGroup.members[i].info.dnodeId = req.members[i].dnodeId; - memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE); + SMArbUpdateGroup newGroup = {0}; + mndInitArbUpdateGroup(pNewGroup, &newGroup); + + taosArrayPush(pArray, &newGroup); + taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); } - newGroup.isSync = req.isSync; - newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId; - memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE); - newGroup.version = req.version; - - SMnode *pMnode = pReq->info.node; - SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); - if (!pOldGroup) { - mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId); - return 0; - } - sdbRelease(pMnode->pSdb, pOldGroup); - - if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) { - mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr()); - ret = -1; + if (taosArrayGetSize(pArray) == 0) { + ret = 0; + goto _OVER; } - tFreeSMArbUpdateGroupReq(&req); + int32_t contLen = 0; + void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray); + if (!pHead) { + mError("failed to build arb-update-group request"); + goto _OVER; + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true}; + ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + +_OVER: + taosArrayDestroy(pArray); + + if (ret != 0) { + for (size_t i = 0; i < sz; i++) { + SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i); + taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)); + } + } + + return ret; +} + +static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) { + int ret = -1; + size_t sz = 0; + + SMArbUpdateGroupBatchReq req = {0}; + if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) { + mError("arb failed to decode arb-update-group request"); + return -1; + } + + SMnode *pMnode = pReq->info.node; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup"); + if (pTrans == NULL) { + mError("failed to update arbgroup in create trans, since %s", terrstr()); + goto _OVER; + } + + sz = taosArrayGetSize(req.updateArray); + for (size_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i); + SArbGroup newGroup = {0}; + newGroup.vgId = pUpdateGroup->vgId; + newGroup.dbUid = pUpdateGroup->dbUid; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId; + memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE); + } + + newGroup.isSync = pUpdateGroup->isSync; + newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId; + memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + newGroup.version = pUpdateGroup->version; + + SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); + if (!pOldGroup) { + mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId); + taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)); + continue; + } + + mndTransAddArbGroupId(pTrans, newGroup.vgId); + + if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) { + mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id, + terrstr()); + goto _OVER; + } + + mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", + pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token, + newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync, + newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token); + + sdbRelease(pMnode->pSdb, pOldGroup); + } + + if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + + ret = 0; + +_OVER: + if (ret != 0) { + // failed to update arbgroup + for (size_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i); + taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)); + } + } + + mndTransDrop(pTrans); + tFreeSMArbUpdateGroupBatchReq(&req); return ret; } @@ -739,7 +839,7 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId, pNew->assignedLeader.token); - mndTransSetArbGroupId(pTrans, pNew->vgId); + mndTransAddArbGroupId(pTrans, pNew->vgId); if (mndTransCheckConflict(pMnode, pTrans) != 0) { ret = -1; goto _OVER; @@ -816,10 +916,10 @@ _OVER: } static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) { - int ret = 0; int64_t nowMs = taosGetTimestampMs(); + size_t size = taosArrayGetSize(memberArray); + SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup)); - size_t size = taosArrayGetSize(memberArray); for (size_t i = 0; i < size; i++) { SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i); @@ -832,17 +932,16 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup); if (updateToken) { - ret = mndPullupArbUpdateGroup(pMnode, &newGroup); - if (ret != 0) { - mInfo("failed to pullup update arb token, vgId:%d, since %s", pRspMember->vgId, terrstr()); - } + taosArrayPush(pUpdateArray, &newGroup); } sdbRelease(pMnode->pSdb, pGroup); - if (ret != 0) break; } - return ret; + (void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray); + + taosArrayDestroy(pUpdateArray); + return 0; } bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token, @@ -900,6 +999,11 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token } static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) { + if (pRsp->contLen == 0) { + mDebug("arb hb-rsp contLen is 0"); + return 0; + } + int32_t ret = -1; SMnode *pMnode = pRsp->info.node; @@ -914,6 +1018,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) { SVArbHeartBeatRsp arbHbRsp = {0}; if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) { + mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code)); terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -934,6 +1039,11 @@ _OVER: } static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { + if (pRsp->contLen == 0) { + mDebug("arb check-sync-rsp contLen is 0"); + return 0; + } + int32_t ret = -1; SMnode *pMnode = pRsp->info.node; @@ -948,7 +1058,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { SVArbCheckSyncRsp syncRsp = {0}; if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) { - mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code)); + mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code)); if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) { terrno = TSDB_CODE_SUCCESS; return 0; @@ -1008,6 +1118,11 @@ _OVER: } static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) { + if (pRsp->contLen == 0) { + mDebug("arb set-assigned-rsp contLen is 0"); + return 0; + } + int32_t ret = -1; SMnode *pMnode = pRsp->info.node; @@ -1022,8 +1137,8 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) { SVArbSetAssignedLeaderRsp setAssignedRsp = {0}; if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) { + mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code)); terrno = TSDB_CODE_INVALID_MSG; - mInfo("arb set assigned failed, des failed since:%s", tstrerror(pRsp->code)); return -1; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 84940e01d4..ecc163985c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -26,7 +26,7 @@ #define TRANS_VER1_NUMBER 1 #define TRANS_VER2_NUMBER 2 #define TRANS_ARRAY_SIZE 8 -#define TRANS_RESERVE_SIZE 48 +#define TRANS_RESERVE_SIZE 44 static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); @@ -196,10 +196,21 @@ SSdbRaw *mndTransEncode(STrans *pTrans) { } SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER) + + int32_t arbGroupNum = taosHashGetSize(pTrans->arbGroupIds); + SDB_SET_INT32(pRaw, dataPos, arbGroupNum, _OVER) + void *pIter = NULL; + pIter = taosHashIterate(pTrans->arbGroupIds, NULL); + while (pIter) { + int32_t arbGroupId = *(int32_t *)pIter; + SDB_SET_INT32(pRaw, dataPos, arbGroupId, _OVER) + pIter = taosHashIterate(pTrans->arbGroupIds, pIter); + } + SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) - terrno = 0; + terrno = 0; _OVER: if (terrno != 0) { @@ -279,6 +290,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { int32_t undoActionNum = 0; int32_t commitActionNum = 0; int32_t dataPos = 0; + int32_t arbgroupIdNum = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -350,6 +362,16 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER); + + pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + + SDB_GET_INT32(pRaw, dataPos, &arbgroupIdNum, _OVER) + for (int32_t i = 0; i < arbgroupIdNum; ++i) { + int32_t arbGroupId = 0; + SDB_GET_INT32(pRaw, dataPos, &arbGroupId, _OVER) + taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0); + } + SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) terrno = 0; @@ -462,6 +484,9 @@ void mndTransDropData(STrans *pTrans) { mndTransDropActions(pTrans->commitActions); pTrans->commitActions = NULL; } + if (pTrans->arbGroupIds != NULL) { + taosHashCleanup(pTrans->arbGroupIds); + } if (pTrans->pRpcArray != NULL) { taosArrayDestroy(pTrans->pRpcArray); pTrans->pRpcArray = NULL; @@ -581,6 +606,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); + pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64(); taosInitRWLatch(&pTrans->lockRpcArray); @@ -733,7 +759,9 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) } } -void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId) { pTrans->arbGroupId = groupId; } +void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) { + taosHashPut(pTrans->arbGroupIds, &groupId, sizeof(int32_t), NULL, 0); +} void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } @@ -821,7 +849,16 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { - if (pNew->arbGroupId == pTrans->arbGroupId) conflict = true; + void *pIter = taosHashIterate(pNew->arbGroupIds, NULL); + while (pIter != NULL) { + int32_t groupId = *(int32_t *)pIter; + if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) { + taosHashCancelIterate(pNew->arbGroupIds, pIter); + conflict = true; + break; + } + pIter = taosHashIterate(pNew->arbGroupIds, pIter); + } } }