refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-17 11:23:04 +08:00
parent c9bdb56d79
commit 2409a17517
4 changed files with 230 additions and 11 deletions

View File

@ -46,6 +46,7 @@ extern "C" {
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
#define SSTREAM_TASK_VER 2 #define SSTREAM_TASK_VER 2
enum { enum {
@ -265,6 +266,7 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo; } SCheckpointInfo;
typedef struct SStreamStatus { typedef struct SStreamStatus {
SStreamTaskSM* pSM;
int8_t taskStatus; int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus; int8_t schedStatus;

View File

@ -0,0 +1,72 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_STREAMSM_H
#define TDENGINE_STREAMSM_H
#include "tstream.h"
#ifdef __cplusplus
extern "C" {
#endif
// moore finite state machine for stream task
typedef struct SStreamTaskState {
ETaskStatus state;
char* name;
} SStreamTaskState;
typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1,
TASK_EVENT_START = 0x2,
TASK_EVENT_STOP = 0x3,
TASK_EVENT_GEN_CHECKPOINT = 0x4,
TASK_EVENT_PAUSE = 0x5,
TASK_EVENT_RESUME = 0x6,
TASK_EVENT_HALT = 0x7,
TASK_EVENT_TRANS_STATE = 0x8,
TASK_EVENT_SCAN_TSDB = 0x9,
TASK_EVENT_SCAN_WAL = 0x10,
} EStreamTaskEvent;
typedef int32_t (*__state_trans_fn)(SStreamTask*);
typedef struct STaskStateTrans {
SStreamTaskState state;
EStreamTaskEvent event;
SStreamTaskState next;
__state_trans_fn pAction;
} STaskStateTrans;
struct SStreamTaskSM {
SStreamTaskState current;
SArray* pTransList; // SArray<STaskStateTrans>
int64_t stateTs;
SStreamTask* pTask;
};
typedef struct SStreamEventInfo {
EStreamTaskEvent event;
const char* name;
bool isTrans;
} SStreamEventInfo;
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_STREAMSM_H

View File

@ -19,6 +19,7 @@
#include "tstream.h" #include "tstream.h"
#include "ttimer.h" #include "ttimer.h"
#include "wal.h" #include "wal.h"
#include "streamsm.h"
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
@ -34,8 +35,11 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId,
(int32_t)sizeof(SStreamTask), tstrerror(terrno));
return NULL; return NULL;
} }
pTask->ver = SSTREAM_TASK_VER; pTask->ver = SSTREAM_TASK_VER;
pTask->id.taskId = tGenIdPI32(); pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId; pTask->id.streamId = streamId;
@ -43,12 +47,18 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory
pTask->info.fillHistory = fillHistory; pTask->info.fillHistory = fillHistory;
pTask->info.triggerParam = triggerParam; pTask->info.triggerParam = triggerParam;
pTask->status.pSM = streamCreateStateMachine(pTask);
if (pTask->status.pSM == NULL) {
taosMemoryFreeClear(pTask);
return NULL;
}
char buf[128] = {0}; char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
pTask->id.idStr = taosStrdup(buf); pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = (fillHistory || hasFillhistory)? TASK_STATUS__SCAN_HISTORY:TASK_STATUS__NORMAL; pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__NORMAL;
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;

View File

@ -0,0 +1,135 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInt.h"
#include "tmisce.h"
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
#include "streamsm.h"
SStreamTaskState StreamTaskStatusList[8] = {
{.state = TASK_STATUS__NORMAL, .name = "normal"},
{.state = TASK_STATUS__DROPPING, .name = "dropping"},
{.state = TASK_STATUS__UNINIT, .name = "uninit"},
{.state = TASK_STATUS__STOP, .name = "stop"},
{.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"},
{.state = TASK_STATUS__HALT, .name = "halt"},
{.state = TASK_STATUS__PAUSE, .name = "paused"},
{.state = TASK_STATUS__CK, .name = "checkpoint"},
};
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn);
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return 0; }
static int32_t streamTaskStartCheckDownstream(SStreamTask* pTask) {
stDebug("s-task:%s start to check downstream tasks", pTask->id.idStr);
return 0;
}
static int32_t streamTaskDoPause(SStreamTask* pTask) {
stDebug("s-task:%s start to pause tasks", pTask->id.idStr);
return 0;
}
static int32_t streamTaskDoResume(SStreamTask* pTask) {
stDebug("s-task:%s start to resume tasks", pTask->id.idStr);
return 0;
}
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
return 0;
}
// todo optimize the perf of find the trans objs by using hash table
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent* pEvent) {
int32_t numOfTrans = taosArrayGetSize(pState->pTransList);
for(int32_t i = 0; i < numOfTrans; ++i) {
STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i);
if (pTrans->state.state == pState->current.state && pTrans->event == *pEvent) {
return pTrans;
}
}
ASSERT(0);
return NULL;
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
if (pSM == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pSM->pTask = pTask;
// set the initial state for the state-machine of stream task
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
pSM->stateTs = taosGetTimestampMs();
int32_t code = initStateTransferTable(pSM);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pSM);
return NULL;
}
return pSM;
}
int32_t taskSMHandleEvent(SStreamTaskSM* pSM, const EStreamTaskEvent* pEvent) {
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, pEvent);
qDebug("start to handle event:%d", *pEvent);
pSM->current = pTrans->next;
pSM->stateTs = taosGetTimestampMs();
qDebug("new state:%s from %s", pTrans->next.name, pSM->current.name);
return pTrans->pAction(pSM->pTask);
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn) {
STaskStateTrans trans = {0};
trans.state = StreamTaskStatusList[current];
trans.next = StreamTaskStatusList[next];
trans.event = event;
trans.pAction = (fn != NULL)? fn : dummyFn;
return trans;
}
int32_t initStateTransferTable(SStreamTaskSM* pSM) {
if (pSM->pTransList == NULL) {
pSM->pTransList = taosArrayInit(8, sizeof(STaskStateTrans));
if (pSM->pTransList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, streamTaskStartCheckDownstream);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, streamTaskDoPause);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, streamTaskDoResume);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, streamTaskDoCheckpoint);
taosArrayPush(pSM->pTransList, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL);
taosArrayPush(pSM->pTransList, &trans);
return 0;
}