diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index fc9b88340f..84d0dd4982 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -26,6 +26,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 1488df3cb1..8c726f9f03 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -88,6 +88,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a2f0b7aced..4213541351 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -837,6 +837,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 9a017e7074..34623c021b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -178,6 +178,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_HEARTBEAT_RSP: + return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fb6a86843c..b83af84086 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -242,6 +242,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6b7420b55f..a3fd19c7f8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1220,3 +1220,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg); } + +// this function is needed, do not try to remove it. +int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 3b7fe3cfdd..c8a4f2a288 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -936,4 +936,8 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); -} \ No newline at end of file +} + +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e4d7b11176..86355cdbb0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -796,6 +796,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskScanHistory(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_HEARTBEAT_RSP: + return tqProcessStreamHbRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR;