fix: trans conflict during update dnode info
This commit is contained in:
parent
f77223d2a4
commit
ce54abd94d
|
@ -1763,6 +1763,14 @@ int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
|||
int32_t tDeserializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
|
||||
void tFreeSStatusReq(SStatusReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
char machineId[TSDB_MACHINE_ID_LEN + 1];
|
||||
} SDnodeInfoReq;
|
||||
|
||||
int32_t tSerializeSDnodeInfoReq(void* buf, int32_t bufLen, SDnodeInfoReq* pReq);
|
||||
int32_t tDeserializeSDnodeInfoReq(void* buf, int32_t bufLen, SDnodeInfoReq* pReq);
|
||||
|
||||
typedef enum {
|
||||
MONITOR_TYPE_COUNTER = 0,
|
||||
MONITOR_TYPE_SLOW_LOG = 1,
|
||||
|
|
|
@ -251,7 +251,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
|
||||
|
|
|
@ -1457,6 +1457,39 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
|||
|
||||
void tFreeSStatusReq(SStatusReq *pReq) { taosArrayDestroy(pReq->pVloads); }
|
||||
|
||||
int32_t tSerializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) {
|
||||
int32_t code = 0, lino = 0;
|
||||
int32_t tlen = 0;
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->dnodeId));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->machineId));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
tlen = encoder.pos;
|
||||
_exit:
|
||||
tEncoderClear(&encoder);
|
||||
return code < 0 ? code : tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSDnodeInfoReq(void *buf, int32_t bufLen, SDnodeInfoReq *pReq) {
|
||||
int32_t code = 0, lino = 0;
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->dnodeId));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->machineId));
|
||||
|
||||
_exit:
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
|
|
@ -173,6 +173,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_UPDATE_DNODE_INFO, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -86,6 +86,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq);
|
|||
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
|
||||
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
|
||||
|
||||
|
@ -126,6 +127,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndRetrieveConfigs);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONFIGS, mndCancelGetNextConfig);
|
||||
|
@ -601,37 +603,77 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
|
||||
int32_t code = 0;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
|
||||
int32_t code = 0, lino = 0;
|
||||
SDnodeInfoReq infoReq = {0};
|
||||
int32_t contLen = 0;
|
||||
void *pReq = NULL;
|
||||
|
||||
infoReq.dnodeId = pDnode->id;
|
||||
tstrncpy(infoReq.machineId, pDnode->machineId, TSDB_MACHINE_ID_LEN + 1);
|
||||
|
||||
if ((contLen = tSerializeSDnodeInfoReq(NULL, 0, &infoReq)) <= 0) {
|
||||
TAOS_RETURN(contLen);
|
||||
}
|
||||
pReq = rpcMallocCont(contLen);
|
||||
if (pReq == NULL) {
|
||||
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
(void)tSerializeSDnodeInfoReq(pReq, contLen, &infoReq);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_UPDATE_DNODE_INFO, .pCont = pReq, .contLen = contLen};
|
||||
TAOS_CHECK_EXIT(tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg));
|
||||
_exit:
|
||||
if (code < 0) {
|
||||
mError("dnode:%d, failed to update dnode info since %s", pDnode->id, tstrerror(code));
|
||||
}
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq) {
|
||||
int32_t code = 0, lino = 0;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SDnodeInfoReq infoReq = {0};
|
||||
SDnodeObj *pDnode = NULL;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
TAOS_CHECK_EXIT(tDeserializeSDnodeInfoReq(pReq->pCont, pReq->contLen, &infoReq));
|
||||
|
||||
pDnode = mndAcquireDnode(pMnode, infoReq.dnodeId);
|
||||
if (pDnode == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "update-dnode-obj");
|
||||
if (pTrans == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _exit;
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
|
||||
pDnode->updateTime = taosGetTimestampMs();
|
||||
|
||||
SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
|
||||
if (pCommitRaw == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _exit;
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
|
||||
sdbFreeRaw(pCommitRaw);
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, tstrerror(code));
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
|
||||
goto _exit;
|
||||
TAOS_CHECK_EXIT(code);
|
||||
}
|
||||
|
||||
_exit:
|
||||
mndReleaseDnode(pMnode, pDnode);
|
||||
if (code != 0) {
|
||||
mError("dnode:%d, failed to update dnode info at line %d since %s", infoReq.dnodeId, lino, tstrerror(code));
|
||||
}
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||
|
|
|
@ -89,7 +89,7 @@ void grantRestore(EGrantType grant, uint64_t value) {}
|
|||
int64_t grantRemain(EGrantType grant) { return 0; }
|
||||
int32_t tGetMachineId(char **result) {
|
||||
*result = NULL;
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
return 0;
|
||||
}
|
||||
int32_t dmProcessGrantReq(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
||||
int32_t dmProcessGrantNotify(void *pInfo, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
|
||||
|
|
Loading…
Reference in New Issue