From ac137b4b33502fba06715e5b0827b219a3a4c9ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Apr 2023 09:46:58 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 27 ++++++++++++--------------- source/libs/stream/src/stream.c | 4 ++++ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9e0a2826c5..aade34e965 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -335,17 +335,17 @@ struct SStreamTask { // meta typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pCheckpointDb; - SHashObj* pTasks; - void* ahandle; - TXN* txn; - FTaskExpand* expandFunc; - int32_t vgId; - SRWLatch lock; - int8_t walScan; + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pCheckpointDb; + SHashObj* pTasks; + void* ahandle; + TXN* txn; + FTaskExpand* expandFunc; + int32_t vgId; + SRWLatch lock; + int8_t walScan; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -358,10 +358,6 @@ void tFreeStreamTask(SStreamTask* pTask); int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem); bool tInputQueueIsFull(const SStreamTask* pTask); -static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); -} - typedef struct { SMsgHead head; int64_t streamId; @@ -537,6 +533,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); // int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); +void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 3b9306c2cf..0f000f1f50 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -352,4 +352,8 @@ void* streamQueueNextItem(SStreamQueue* queue) { } return streamQueueCurItem(queue); } +} + +void streamTaskInputFail(SStreamTask* pTask) { + atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file