From 647f9f47ef6602509d5330391965cf9b532cd275 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 Jul 2024 10:32:23 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/common/tmsgdef.h | 1 - include/libs/stream/streamMsg.h | 1 - source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 - source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 - source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 - source/dnode/mnode/impl/src/mndStream.c | 1 - source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- source/dnode/snode/src/snode.c | 2 -- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 -- 9 files changed, 1 insertion(+), 11 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 7621615278..3515df3127 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -250,7 +250,6 @@ TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CONSEN_CHKPT, "stream-req-consen-chkpt", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) diff --git a/include/libs/stream/streamMsg.h b/include/libs/stream/streamMsg.h index bdb8ff7f8e..b253054dfe 100644 --- a/include/libs/stream/streamMsg.h +++ b/include/libs/stream/streamMsg.h @@ -232,7 +232,6 @@ typedef struct { typedef struct SCheckpointConsensusEntry { SRestoreCheckpointInfo req; - SRpcHandleInfo rspInfo; int64_t ts; } SCheckpointConsensusEntry; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index f3763ef0c5..9b987b3237 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -243,7 +243,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 3d0587a11b..5c2f54fd10 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -97,7 +97,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7d35fd71b7..fbe1925e3f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -972,7 +972,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cd99714395..4faa8bdb58 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -121,7 +121,6 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport); - mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mndProcessConsensusCheckpointId); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index c4797957c2..430bfcc3a2 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -103,7 +103,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { } else { if (replica != pVgroup->replica) { mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", - pVgroup->vgId); + pVgroup->vgId, pVgroup->replica, replica); *allReady = false; break; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 60e57e8a2f..cfa24b2430 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -128,8 +128,6 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg); - case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP: - return tqStreamProcessConsensusChkptRsp2(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 04839c3357..9dcf7e53c7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -861,8 +861,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); case TDMT_MND_STREAM_CHKPT_REPORT_RSP: return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg); - case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP: - return tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR;