From 2b85ea62aacb927bf0b196eb9e40bc7f2d0fcd05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 14 Jul 2023 17:48:47 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 -- source/libs/stream/inc/streamInt.h | 2 ++ 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 08e4b55ffe..d1aaa67ddf 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -611,7 +611,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); -int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask); @@ -619,7 +618,6 @@ int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); -int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); #ifdef __cplusplus } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d732ec2876..51394143ac 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -38,6 +38,7 @@ extern int32_t streamBackendCfWrapperId; void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); +int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); @@ -51,6 +52,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);