refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-08-31 13:53:12 +08:00
parent 3fc3dafca6
commit 2285b834b0
5 changed files with 16 additions and 20 deletions

View File

@ -50,7 +50,6 @@ enum {
TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause TASK_STATUS__PAUSE, // pause
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
TASK_STATUS__CK_READY,
}; };
enum { enum {
@ -190,19 +189,9 @@ void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { void streamQueueProcessFail(SStreamQueue* queue);
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); void* streamQueueNextItem(SStreamQueue* pQueue);
queue->qItem = NULL;
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
}
static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
void* streamQueueNextItem(SStreamQueue* pQueue);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);

View File

@ -217,7 +217,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
} }
if (taskLevel == TASK_LEVEL__SINK) { if (taskLevel == TASK_LEVEL__SINK) {
pTask->status.taskStatus = TASK_STATUS__CK_READY;
qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream", qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream",
id, num); id, num);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
@ -231,8 +230,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// can start local checkpoint procedure // can start local checkpoint procedure
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY // Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks // already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointBlock(pBlock, pTask); code = continueDispatchCheckpointBlock(pBlock, pTask);
} }
@ -314,7 +312,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1); int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0); ASSERT(remain >= 0);
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state if (remain == 0) { // all tasks are ready
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr); qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
pMeta->totalTasks = 0; pMeta->totalTasks = 0;

View File

@ -577,7 +577,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) { if (type == STREAM_INPUT__CHECKPOINT) {
// ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); // ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
// pTask->status.taskStatus = TASK_STATUS__CK_READY;
qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus)); streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskBuildCheckpoint(pTask); streamTaskBuildCheckpoint(pTask);

View File

@ -88,6 +88,17 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
} }
} }
void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
queue->qItem = NULL;
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
}
void streamQueueProcessFail(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
#if 0 #if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes) { bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
// //

View File

@ -65,7 +65,6 @@ const char* streamGetTaskStatusStr(int32_t status) {
case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused"; case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point"; case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__CK_READY: return "check-point-ready";
case TASK_STATUS__DROPPING: return "dropping"; case TASK_STATUS__DROPPING: return "dropping";
case TASK_STATUS__STOP: return "stop"; case TASK_STATUS__STOP: return "stop";
default:return ""; default:return "";