From 161fd6902a9f58e5eb93ddbfa0c7dadde48ebb0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Aug 2023 12:45:20 +0800 Subject: [PATCH] fix(stream): fix the acquire task. --- source/dnode/vnode/src/tq/tq.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b123f8cf42..ee6e7a76a8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1634,7 +1634,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); if (pTask == NULL) { - tDeleteStreamDispatchReq(&req); +// tDeleteStreamDispatchReq(&req); return -1; } @@ -1741,7 +1741,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { tDecoderClear(&decoder); // todo handle the case when the task not in ready state, and the checkpoint msg is arrived. - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, req.taskId); @@ -1798,7 +1798,7 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId); return code; @@ -1831,7 +1831,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { } tDecoderClear(&decoder); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); @@ -1844,7 +1844,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { - pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); if (pHistoryTask == NULL) { tqError( "vgId:%d failed to acquire fill-history task:0x%x when handling task update, it may have been dropped "