diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 629efa00b3..97bc58b90f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -44,8 +44,9 @@ extern "C" { #define NODE_ROLE_LEADER 0x2 #define NODE_ROLE_FOLLOWER 0x3 -typedef struct SStreamTask SStreamTask; -typedef struct SStreamQueue SStreamQueue; +typedef struct SStreamTask SStreamTask; +typedef struct SStreamQueue SStreamQueue; +typedef struct SStreamTaskSM SStreamTaskSM; #define SSTREAM_TASK_VER 2 enum { @@ -265,14 +266,15 @@ typedef struct SCheckpointInfo { } SCheckpointInfo; typedef struct SStreamStatus { - int8_t taskStatus; - int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set - int8_t schedStatus; - int8_t keepTaskStatus; - bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it - int8_t pauseAllowed; // allowed task status to be set to be paused - int32_t timerActive; // timer is active - int32_t inScanHistorySentinel; + SStreamTaskSM* pSM; + int8_t taskStatus; + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set + int8_t schedStatus; + int8_t keepTaskStatus; + bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it + int8_t pauseAllowed; // allowed task status to be set to be paused + int32_t timerActive; // timer is active + int32_t inScanHistorySentinel; } SStreamStatus; typedef struct SDataRange { diff --git a/source/libs/stream/inc/streamsm.h b/source/libs/stream/inc/streamsm.h new file mode 100644 index 0000000000..4b87dcfe84 --- /dev/null +++ b/source/libs/stream/inc/streamsm.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_STREAMSM_H +#define TDENGINE_STREAMSM_H + +#include "tstream.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// moore finite state machine for stream task +typedef struct SStreamTaskState { + ETaskStatus state; + char* name; +} SStreamTaskState; + +typedef enum EStreamTaskEvent { + TASK_EVENT_INIT = 0x1, + TASK_EVENT_START = 0x2, + TASK_EVENT_STOP = 0x3, + TASK_EVENT_GEN_CHECKPOINT = 0x4, + TASK_EVENT_PAUSE = 0x5, + TASK_EVENT_RESUME = 0x6, + TASK_EVENT_HALT = 0x7, + TASK_EVENT_TRANS_STATE = 0x8, + TASK_EVENT_SCAN_TSDB = 0x9, + TASK_EVENT_SCAN_WAL = 0x10, +} EStreamTaskEvent; + +typedef int32_t (*__state_trans_fn)(SStreamTask*); + +typedef struct STaskStateTrans { + SStreamTaskState state; + EStreamTaskEvent event; + SStreamTaskState next; + __state_trans_fn pAction; +} STaskStateTrans; + +struct SStreamTaskSM { + SStreamTaskState current; + SArray* pTransList; // SArray + int64_t stateTs; + SStreamTask* pTask; +}; + +typedef struct SStreamEventInfo { + EStreamTaskEvent event; + const char* name; + bool isTrans; +} SStreamEventInfo; + +SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_STREAMSM_H diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 57103e5a96..c9eb76b6c3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -19,6 +19,7 @@ #include "tstream.h" #include "ttimer.h" #include "wal.h" +#include "streamsm.h" static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo); @@ -34,8 +35,11 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("s-task:0x%" PRIx64 " failed malloc new stream task, size:%d, code:%s", streamId, + (int32_t)sizeof(SStreamTask), tstrerror(terrno)); return NULL; } + pTask->ver = SSTREAM_TASK_VER; pTask->id.taskId = tGenIdPI32(); pTask->id.streamId = streamId; @@ -43,12 +47,18 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory pTask->info.fillHistory = fillHistory; pTask->info.triggerParam = triggerParam; + pTask->status.pSM = streamCreateStateMachine(pTask); + if (pTask->status.pSM == NULL) { + taosMemoryFreeClear(pTask); + return NULL; + } + char buf[128] = {0}; sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = taosStrdup(buf); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->status.taskStatus = (fillHistory || hasFillhistory)? TASK_STATUS__SCAN_HISTORY:TASK_STATUS__NORMAL; + pTask->status.taskStatus = (fillHistory || hasFillhistory) ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__NORMAL; pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c new file mode 100644 index 0000000000..b6bd53eb87 --- /dev/null +++ b/source/libs/stream/src/streamTaskSm.c @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamInt.h" +#include "tmisce.h" +#include "tstream.h" +#include "ttimer.h" +#include "wal.h" +#include "streamsm.h" + +SStreamTaskState StreamTaskStatusList[8] = { + {.state = TASK_STATUS__NORMAL, .name = "normal"}, + {.state = TASK_STATUS__DROPPING, .name = "dropping"}, + {.state = TASK_STATUS__UNINIT, .name = "uninit"}, + {.state = TASK_STATUS__STOP, .name = "stop"}, + {.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"}, + {.state = TASK_STATUS__HALT, .name = "halt"}, + {.state = TASK_STATUS__PAUSE, .name = "paused"}, + {.state = TASK_STATUS__CK, .name = "checkpoint"}, +}; + +static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn); +static int32_t initStateTransferTable(SStreamTaskSM* pSM); + +static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return 0; } +static int32_t streamTaskStartCheckDownstream(SStreamTask* pTask) { + stDebug("s-task:%s start to check downstream tasks", pTask->id.idStr); + return 0; +} +static int32_t streamTaskDoPause(SStreamTask* pTask) { + stDebug("s-task:%s start to pause tasks", pTask->id.idStr); + return 0; +} +static int32_t streamTaskDoResume(SStreamTask* pTask) { + stDebug("s-task:%s start to resume tasks", pTask->id.idStr); + return 0; +} +static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) { + stDebug("s-task:%s start to do checkpoint", pTask->id.idStr); + return 0; +} + +// todo optimize the perf of find the trans objs by using hash table +static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent* pEvent) { + int32_t numOfTrans = taosArrayGetSize(pState->pTransList); + for(int32_t i = 0; i < numOfTrans; ++i) { + STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i); + if (pTrans->state.state == pState->current.state && pTrans->event == *pEvent) { + return pTrans; + } + } + + ASSERT(0); + return NULL; +} + +SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) { + SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM)); + if (pSM == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pSM->pTask = pTask; + + // set the initial state for the state-machine of stream task + pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT]; + pSM->stateTs = taosGetTimestampMs(); + int32_t code = initStateTransferTable(pSM); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pSM); + return NULL; + } + + return pSM; +} + +int32_t taskSMHandleEvent(SStreamTaskSM* pSM, const EStreamTaskEvent* pEvent) { + STaskStateTrans* pTrans = streamTaskFindTransform(pSM, pEvent); + qDebug("start to handle event:%d", *pEvent); + + pSM->current = pTrans->next; + pSM->stateTs = taosGetTimestampMs(); + qDebug("new state:%s from %s", pTrans->next.name, pSM->current.name); + + return pTrans->pAction(pSM->pTask); +} + +STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn) { + STaskStateTrans trans = {0}; + trans.state = StreamTaskStatusList[current]; + trans.next = StreamTaskStatusList[next]; + trans.event = event; + trans.pAction = (fn != NULL)? fn : dummyFn; + return trans; +} + +int32_t initStateTransferTable(SStreamTaskSM* pSM) { + if (pSM->pTransList == NULL) { + pSM->pTransList = taosArrayInit(8, sizeof(STaskStateTrans)); + if (pSM->pTransList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, streamTaskStartCheckDownstream); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, streamTaskDoPause); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, streamTaskDoResume); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, streamTaskDoCheckpoint); + taosArrayPush(pSM->pTransList, &trans); + + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL); + taosArrayPush(pSM->pTransList, &trans); + + return 0; +} \ No newline at end of file