From ce54abd94d0426fe3e03983e2bce66f86f1be361 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 30 Jul 2024 16:54:39 +0800 Subject: [PATCH] fix: trans conflict during update dnode info --- include/common/tmsg.h | 8 +++ include/common/tmsgdef.h | 2 +- source/common/src/tmsg.c | 33 +++++++++++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndDnode.c | 66 +++++++++++++++++---- source/dnode/mnode/impl/src/mndGrant.c | 2 +- 6 files changed, 98 insertions(+), 14 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 782b9a072d..20d73e06d0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1763,6 +1763,14 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); void tFreeSStatusReq(SStatusReq* pReq); +typedef struct { + int32_t dnodeId; + char machineId[TSDB_MACHINE_ID_LEN + 1]; +} SDnodeInfoReq; + +int32_t tSerializeSDnodeInfoReq(void* buf, int32_t bufLen, SDnodeInfoReq* pReq); +int32_t tDeserializeSDnodeInfoReq(void* buf, int32_t bufLen, SDnodeInfoReq* pReq); + typedef enum { MONITOR_TYPE_COUNTER = 0, MONITOR_TYPE_SLOW_LOG = 1, diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3515df3127..de12b45146 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -251,7 +251,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9e663d495c..e7c0232066 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1457,6 +1457,39 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { void tFreeSStatusReq(SStatusReq *pReq) { taosArrayDestroy(pReq->pVloads); } +int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) { + int32_t code = 0, lino = 0; + int32_t tlen = 0; + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->dnodeId)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->machineId)); + + tEndEncode(&encoder); + + tlen = encoder.pos; +_exit: + tEncoderClear(&encoder); + return code < 0 ? code : tlen; +} + +int32_t tDeserializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) { + int32_t code = 0, lino = 0; + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->dnodeId)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->machineId)); + +_exit: + tEndDecode(&decoder); + tDecoderClear(&decoder); + return code; +} + int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 7605df8e7c..43c40c65c3 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -173,6 +173,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_UPDATE_DNODE_INFO, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 3e70bb20cd..fb1687d851 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -86,6 +86,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq); static int32_t mndProcessNotifyReq(SRpcMsg *pReq); static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq); static int32_t mndProcessStatisReq(SRpcMsg *pReq); +static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq); static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp); static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp); @@ -126,6 +127,7 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp); + mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig); @@ -601,37 +603,77 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { } static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) { - int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj"); + int32_t code = 0, lino = 0; + SDnodeInfoReq infoReq = {0}; + int32_t contLen = 0; + void *pReq = NULL; + + infoReq.dnodeId = pDnode->id; + tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1); + + if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) { + TAOS_RETURN(contLen); + } + pReq = rpcMallocCont(contLen); + if (pReq == NULL) { + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + } + + (void)tSerializeSDnodeInfoReq(pReq, contLen, &infoReq); + + SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen}; + TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg)); +_exit: + if (code < 0) { + mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code)); + } + TAOS_RETURN(code); +} + +static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) { + int32_t code = 0, lino = 0; + SMnode *pMnode = pReq->info.node; + SDnodeInfoReq infoReq = {0}; + SDnodeObj *pDnode = NULL; + STrans *pTrans = NULL; + + TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq)); + + pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId); + if (pDnode == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj"); if (pTrans == NULL) { - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; - goto _exit; + TAOS_CHECK_EXIT(terrno); } pDnode->updateTime = taosGetTimestampMs(); SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode); if (pCommitRaw == NULL) { - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; - goto _exit; + TAOS_CHECK_EXIT(terrno); } if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) { + sdbFreeRaw(pCommitRaw); mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code)); - code = terrno; - goto _exit; + TAOS_CHECK_EXIT(code); } (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code)); - goto _exit; + TAOS_CHECK_EXIT(code); } _exit: + mndReleaseDnode(pMnode, pDnode); + if (code != 0) { + mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code)); + } mndTransDrop(pTrans); - return code; + TAOS_RETURN(code); } static int32_t mndProcessStatusReq(SRpcMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndGrant.c b/source/dnode/mnode/impl/src/mndGrant.c index af60bc9e4b..c294e0defe 100644 --- a/source/dnode/mnode/impl/src/mndGrant.c +++ b/source/dnode/mnode/impl/src/mndGrant.c @@ -89,7 +89,7 @@ void grantRestore(EGrantType grant, uint64_t value) {} int64_t grantRemain(EGrantType grant) { return 0; } int32_t tGetMachineId(char **result) { *result = NULL; - return TSDB_CODE_APP_ERROR; + return 0; } int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }