From 2285b834b0782a3f25659bb4c7dd195d34a75f0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Aug 2023 13:53:12 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 17 +++-------------- source/libs/stream/src/streamCheckpoint.c | 6 ++---- source/libs/stream/src/streamExec.c | 1 - source/libs/stream/src/streamQueue.c | 11 +++++++++++ source/libs/stream/src/streamRecover.c | 1 - 5 files changed, 16 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 929a330e8a..506543e26e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,7 +50,6 @@ enum { TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore - TASK_STATUS__CK_READY, }; enum { @@ -190,19 +189,9 @@ void streamCleanUp(); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); - -static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { - ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); - queue->qItem = NULL; - atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); -} - -static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { - ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); - atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); -} - -void* streamQueueNextItem(SStreamQueue* pQueue); +void streamQueueProcessSuccess(SStreamQueue* queue); +void streamQueueProcessFail(SStreamQueue* queue); +void* streamQueueNextItem(SStreamQueue* pQueue); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 2a0940c4d0..baf319d014 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -217,7 +217,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } if (taskLevel == TASK_LEVEL__SINK) { - pTask->status.taskStatus = TASK_STATUS__CK_READY; qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream", id, num); streamFreeQitem((SStreamQueueItem*)pBlock); @@ -231,8 +230,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // can start local checkpoint procedure pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); - // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY - // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task + // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks code = continueDispatchCheckpointBlock(pBlock, pTask); } @@ -314,7 +312,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); ASSERT(remain >= 0); - if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state + if (remain == 0) { // all tasks are ready qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); pMeta->totalTasks = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e6b112d050..ff667fa778 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -577,7 +577,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { // ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); -// pTask->status.taskStatus = TASK_STATUS__CK_READY; qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskBuildCheckpoint(pTask); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 7e6a438e12..34b0a00639 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -88,6 +88,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue) { } } +void streamQueueProcessSuccess(SStreamQueue* queue) { + ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + queue->qItem = NULL; + atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); +} + +void streamQueueProcessFail(SStreamQueue* queue) { + ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); +} + #if 0 bool streamQueueResEmpty(const SStreamQueueRes* pRes) { // diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index fda6333516..5dd6d9087b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -65,7 +65,6 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; case TASK_STATUS__CK: return "check-point"; - case TASK_STATUS__CK_READY: return "check-point-ready"; case TASK_STATUS__DROPPING: return "dropping"; case TASK_STATUS__STOP: return "stop"; default:return "";