refactor(stream): handle retrieve chkpt trigger in ctrl queue.
This commit is contained in:
parent
267f7d3b08
commit
81b718ec69
|
@ -1018,8 +1018,8 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -947,10 +947,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER:
|
||||
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_GET_STREAM_PROGRESS:
|
||||
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
|
||||
default:
|
||||
|
@ -983,6 +979,10 @@ int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pIn
|
|||
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
|
||||
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER:
|
||||
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_CHKPT_REPORT_RSP:
|
||||
|
|
Loading…
Reference in New Issue