refactor: do some internal refactor.
This commit is contained in:
parent
ce4153b6fc
commit
647f9f47ef
|
@ -250,7 +250,6 @@
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CONSEN_CHKPT, "stream-req-consen-chkpt", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
|
||||||
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
||||||
|
|
|
@ -232,7 +232,6 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct SCheckpointConsensusEntry {
|
typedef struct SCheckpointConsensusEntry {
|
||||||
SRestoreCheckpointInfo req;
|
SRestoreCheckpointInfo req;
|
||||||
SRpcHandleInfo rspInfo;
|
|
||||||
int64_t ts;
|
int64_t ts;
|
||||||
} SCheckpointConsensusEntry;
|
} SCheckpointConsensusEntry;
|
||||||
|
|
||||||
|
|
|
@ -243,7 +243,6 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,6 @@ SArray *smGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
|
|
@ -972,7 +972,6 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -121,7 +121,6 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CONSEN_CHKPT, mndProcessConsensusCheckpointId);
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||||
|
|
|
@ -103,7 +103,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
} else {
|
} else {
|
||||||
if (replica != pVgroup->replica) {
|
if (replica != pVgroup->replica) {
|
||||||
mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
|
mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
|
||||||
pVgroup->vgId);
|
pVgroup->vgId, pVgroup->replica, replica);
|
||||||
*allReady = false;
|
*allReady = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,8 +128,6 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
|
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
|
||||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||||
return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
|
return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
|
||||||
case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP:
|
|
||||||
return tqStreamProcessConsensusChkptRsp2(pSnode->pMeta, pMsg);
|
|
||||||
default:
|
default:
|
||||||
sndError("invalid snode msg:%d", pMsg->msgType);
|
sndError("invalid snode msg:%d", pMsg->msgType);
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
|
@ -861,8 +861,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
|
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
|
||||||
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskChkptReportRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_MND_STREAM_REQ_CONSEN_CHKPT_RSP:
|
|
||||||
return tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg);
|
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
|
Loading…
Reference in New Issue