diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 25f90961a3..e90cdb8473 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -621,7 +621,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); -int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq); +int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq); int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 84d2da4f97..a3d28832fa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1409,18 +1409,17 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = req.dstTaskId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - - if (pTask) { - SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessRetrieveReq(pTask, &req, &rsp); - - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tDeleteStreamRetrieveReq(&req); - return 0; - } else { + if (pTask == NULL) { tDeleteStreamRetrieveReq(&req); return -1; } + + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; + streamProcessRetrieveReq(pTask, &req, &rsp); + + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + tDeleteStreamRetrieveReq(&req); + return 0; } int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { @@ -1579,7 +1578,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) { goto FAIL; } - streamProcessCheckpointReq(pMeta, pTask, &req); + streamProcessCheckpointReq(pTask, &req); streamMetaReleaseTask(pMeta, pTask); return code; @@ -1593,7 +1592,6 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) { // if this task is an source task, send source rsp to mnode int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t code = 0; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 5ba8beb467..fead7253f8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -210,7 +210,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, return code; } -int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) { +int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) { int32_t code; int64_t checkpointId = pReq->checkpointId; int32_t childId = pReq->childId;