diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 1e59dd1805..23d21b24a3 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -203,6 +203,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 884365447b..d0ecd598c3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -88,8 +88,6 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq); - /*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/ - mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); @@ -868,7 +866,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; -// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9299dbc9e5..d8441a1a64 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -680,7 +680,6 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0}; tmsgSendRsp(&rsp); - return 0; } @@ -1663,6 +1662,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } +// todo refactor. int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { STQ* pTq = pVnode->pTq; SMsgHead* msgStr = pMsg->pCont; @@ -1830,26 +1830,25 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SStreamTaskNodeUpdateMsg req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code)); - return code; + rsp.code = TSDB_CODE_MSG_DECODE_ERROR; + tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); + goto _end; } - tDecoderClear(&decoder); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; + rsp.code = TSDB_CODE_SUCCESS; + goto _end; } tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); @@ -1858,27 +1857,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); - if (pHistoryTask == NULL) { + if (pHistoryTask != NULL) { + tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr); + streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList); + + streamTaskRestart(pHistoryTask, NULL); + streamMetaReleaseTask(pMeta, pHistoryTask); + } else { tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped", pMeta->vgId, pTask->historyTaskId.taskId); - streamMetaReleaseTask(pMeta, pTask); - - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; } - - tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr); - streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList); - } - - if (pHistoryTask != NULL) { - streamTaskRestart(pHistoryTask, NULL); - streamMetaReleaseTask(pMeta, pHistoryTask); } streamTaskRestart(pTask, NULL); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_SUCCESS; + int32_t code = rsp.code; + +_end: + tDecoderClear(&decoder); + tmsgSendRsp(&rsp); + return code; }