refactor(stream): add state machine to manage the state of stream tasks.
This commit is contained in:
parent
596eb073b7
commit
6f3c0d17cc
|
@ -75,6 +75,7 @@ typedef enum ETaskStatus {
|
||||||
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
TASK_STATUS__HALT, // pause, but not be manipulated by user command
|
||||||
TASK_STATUS__PAUSE, // pause
|
TASK_STATUS__PAUSE, // pause
|
||||||
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
|
TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore
|
||||||
|
TASK_STATUS_STREAM_SCAN_HISTORY,
|
||||||
} ETaskStatus;
|
} ETaskStatus;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -129,17 +130,18 @@ enum {
|
||||||
|
|
||||||
typedef enum EStreamTaskEvent {
|
typedef enum EStreamTaskEvent {
|
||||||
TASK_EVENT_INIT = 0x1,
|
TASK_EVENT_INIT = 0x1,
|
||||||
TASK_EVENT_INIT_SCAN_HISTORY = 0x2,
|
TASK_EVENT_INIT_SCANHIST = 0x2,
|
||||||
TASK_EVENT_SCANHIST_COMPLETED = 0x3,
|
TASK_EVENT_INIT_STREAM_SCANHIST = 0x3,
|
||||||
TASK_EVENT_START = 0x4,
|
TASK_EVENT_SCANHIST_DONE = 0x4,
|
||||||
TASK_EVENT_STOP = 0x5,
|
TASK_EVENT_STOP = 0x5,
|
||||||
TASK_EVENT_GEN_CHECKPOINT = 0x6,
|
TASK_EVENT_GEN_CHECKPOINT = 0x6,
|
||||||
TASK_EVENT_PAUSE = 0x7,
|
TASK_EVENT_CHECKPOINT_DONE = 0x7,
|
||||||
TASK_EVENT_RESUME = 0x8,
|
TASK_EVENT_PAUSE = 0x8,
|
||||||
TASK_EVENT_HALT = 0x9,
|
TASK_EVENT_RESUME = 0x9,
|
||||||
TASK_EVENT_TRANS_STATE = 0xA,
|
TASK_EVENT_HALT = 0xA,
|
||||||
TASK_EVENT_SCAN_TSDB = 0xB,
|
TASK_EVENT_TRANS_STATE = 0xB,
|
||||||
TASK_EVENT_SCAN_WAL = 0xC,
|
TASK_EVENT_SCAN_TSDB = 0xC,
|
||||||
|
TASK_EVENT_SCAN_WAL = 0xD,
|
||||||
} EStreamTaskEvent;
|
} EStreamTaskEvent;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1029,7 +1029,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
|
|
||||||
bool restored = pTq->pVnode->restored;
|
bool restored = pTq->pVnode->restored;
|
||||||
if (p != NULL && restored && p->info.fillHistory == 0) {
|
if (p != NULL && restored && p->info.fillHistory == 0) {
|
||||||
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY;
|
EStreamTaskEvent event = (p->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||||
streamTaskHandleEvent(p->status.pSM, event);
|
streamTaskHandleEvent(p->status.pSM, event);
|
||||||
} else if (!restored) {
|
} else if (!restored) {
|
||||||
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
|
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
|
||||||
|
@ -1259,7 +1259,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
tqScanWalAsync(pTq, false);
|
tqScanWalAsync(pTq, false);
|
||||||
}
|
}
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
@ -1683,7 +1683,6 @@ FAIL:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
|
|
||||||
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
@ -1694,7 +1693,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
// disable auto rsp to mnode
|
// disable auto rsp to mnode
|
||||||
pRsp->info.handle = NULL;
|
pRsp->info.handle = NULL;
|
||||||
|
|
||||||
// todo: add counter to make sure other tasks would not be trapped in checkpoint state
|
|
||||||
SStreamCheckpointSourceReq req = {0};
|
SStreamCheckpointSourceReq req = {0};
|
||||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
|
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
|
||||||
|
@ -1725,6 +1723,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
|
// todo handle failure to reset from checkpoint procedure
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
|
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
|
||||||
|
@ -1735,6 +1734,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo handle failure to reset from checkpoint procedure
|
||||||
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
|
||||||
if (pTask->status.downstreamReady != 1) {
|
if (pTask->status.downstreamReady != 1) {
|
||||||
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
||||||
|
@ -1750,8 +1750,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo save the checkpoint failed info
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) {
|
||||||
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
qError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
|
@ -99,7 +99,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCAN_HISTORY;
|
EStreamTaskEvent event = (pTask->hTaskInfo.id.taskId == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,14 +31,19 @@ typedef struct SStreamTaskState {
|
||||||
typedef int32_t (*__state_trans_fn)(SStreamTask*);
|
typedef int32_t (*__state_trans_fn)(SStreamTask*);
|
||||||
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
|
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
|
||||||
|
|
||||||
typedef struct STaskStateTrans {
|
typedef struct SAttachedEventInfo {
|
||||||
bool autoInvokeEndFn;
|
ETaskStatus status;
|
||||||
SStreamTaskState state;
|
|
||||||
EStreamTaskEvent event;
|
EStreamTaskEvent event;
|
||||||
SStreamTaskState next;
|
} SAttachedEventInfo;
|
||||||
__state_trans_fn preAction;
|
|
||||||
__state_trans_fn pAction;
|
typedef struct STaskStateTrans {
|
||||||
|
bool autoInvokeEndFn;
|
||||||
|
SStreamTaskState state;
|
||||||
|
EStreamTaskEvent event;
|
||||||
|
SStreamTaskState next;
|
||||||
|
__state_trans_fn pAction;
|
||||||
__state_trans_succ_fn pSuccAction;
|
__state_trans_succ_fn pSuccAction;
|
||||||
|
SAttachedEventInfo attachEvent;
|
||||||
} STaskStateTrans;
|
} STaskStateTrans;
|
||||||
|
|
||||||
struct SStreamTaskSM {
|
struct SStreamTaskSM {
|
||||||
|
@ -48,6 +53,8 @@ struct SStreamTaskSM {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
SStreamTaskState current;
|
SStreamTaskState current;
|
||||||
SStreamTaskState prev;
|
SStreamTaskState prev;
|
||||||
|
// register the next handled event, if current state is not allowed to handle this event
|
||||||
|
SArray* eventList;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SStreamEventInfo {
|
typedef struct SStreamEventInfo {
|
||||||
|
@ -57,7 +64,7 @@ typedef struct SStreamEventInfo {
|
||||||
} SStreamEventInfo;
|
} SStreamEventInfo;
|
||||||
|
|
||||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
||||||
|
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -69,7 +69,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__CK) {
|
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
|
||||||
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, nextTrigger);
|
||||||
} else {
|
} else {
|
||||||
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
if (status == TASK_TRIGGER_STATUS__ACTIVE) {
|
||||||
|
@ -267,8 +267,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
}
|
}
|
||||||
|
|
||||||
// disable the data from upstream tasks
|
// disable the data from upstream tasks
|
||||||
int8_t st = pTask->status.taskStatus;
|
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__HALT) {
|
||||||
if (st == TASK_STATUS__HALT) {
|
|
||||||
status = TASK_INPUT_STATUS__BLOCKED;
|
status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -135,12 +135,13 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
|
// todo this status may not be set here.
|
||||||
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK;
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
||||||
|
|
||||||
pTask->checkpointingId = pReq->checkpointId;
|
pTask->checkpointingId = pReq->checkpointId;
|
||||||
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
|
||||||
pTask->chkInfo.startTs = taosGetTimestampMs();
|
pTask->chkInfo.startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
pTask->execInfo.checkpoint += 1;
|
pTask->execInfo.checkpoint += 1;
|
||||||
|
|
||||||
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
||||||
|
@ -171,11 +172,11 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// set the task status
|
|
||||||
pTask->checkpointingId = checkpointId;
|
|
||||||
|
|
||||||
// set task status
|
// set task status
|
||||||
pTask->status.taskStatus = TASK_STATUS__CK;
|
if (streamTaskGetStatus(pTask, NULL) != TASK_STATUS__CK) {
|
||||||
|
pTask->checkpointingId = checkpointId;
|
||||||
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
||||||
|
}
|
||||||
|
|
||||||
{ // todo: remove this when the pipeline checkpoint generating is used.
|
{ // todo: remove this when the pipeline checkpoint generating is used.
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
@ -195,6 +196,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
|
||||||
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
|
||||||
continueDispatchCheckpointBlock(pBlock, pTask);
|
continueDispatchCheckpointBlock(pBlock, pTask);
|
||||||
} else { // only one task exists, no need to dispatch downstream info
|
} else { // only one task exists, no need to dispatch downstream info
|
||||||
|
atomic_add_fetch_32(&pTask->checkpointNotReadyTasks, 1);
|
||||||
streamProcessCheckpointReadyMsg(pTask);
|
streamProcessCheckpointReadyMsg(pTask);
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
}
|
}
|
||||||
|
@ -288,12 +290,14 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t prev = p->status.taskStatus;
|
|
||||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
||||||
|
|
||||||
p->chkInfo.checkpointId = p->checkpointingId;
|
p->chkInfo.checkpointId = p->checkpointingId;
|
||||||
streamTaskClearCheckInfo(p);
|
streamTaskClearCheckInfo(p);
|
||||||
streamSetStatusNormal(p);
|
|
||||||
|
char* str = NULL;
|
||||||
|
streamTaskGetStatus(p, &str);
|
||||||
|
streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
|
|
||||||
// save the task
|
// save the task
|
||||||
streamMetaSaveTask(pMeta, p);
|
streamMetaSaveTask(pMeta, p);
|
||||||
|
@ -302,13 +306,13 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
|
"vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, "
|
||||||
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
"checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s",
|
||||||
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
|
pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer,
|
||||||
streamGetTaskStatusStr(prev));
|
str);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
stError("vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s", pMeta->vgId,
|
||||||
checkpointId, terrstr());
|
checkpointId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
|
@ -1011,7 +1011,7 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo,
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
|
||||||
stDebug("s-task:%s add scan history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
|
stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
|
||||||
num);
|
num);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1027,7 +1027,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
||||||
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
|
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
|
||||||
tmsgSendRsp(&pInfo->msg);
|
tmsgSendRsp(&pInfo->msg);
|
||||||
|
|
||||||
stDebug("s-task:%s level:%d notify upstream:0x%x continuing scan data in WAL", id, level, pInfo->taskId);
|
stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayClear(pTask->pRspMsgList);
|
taosArrayClear(pTask->pRspMsgList);
|
||||||
|
|
|
@ -324,7 +324,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
ASSERT(status == TASK_STATUS__NORMAL);
|
ASSERT(status == TASK_STATUS__NORMAL);
|
||||||
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||||
// pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
|
||||||
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,7 +374,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
// 7. pause allowed.
|
// 7. pause allowed.
|
||||||
streamTaskEnablePause(pStreamTask);
|
streamTaskEnablePause(pStreamTask);
|
||||||
if (taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) {
|
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputInfo.queue->pQueue)) {
|
||||||
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||||
|
|
||||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||||
|
@ -492,7 +491,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
streamDispatchStreamBlock(pTask);
|
streamDispatchStreamBlock(pTask);
|
||||||
} else {
|
} else { // todo put into queue failed, retry
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
}
|
}
|
||||||
} else { // level == TASK_LEVEL__SINK
|
} else { // level == TASK_LEVEL__SINK
|
||||||
|
|
|
@ -892,7 +892,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
|
|
||||||
STaskStatusEntry entry = {
|
STaskStatusEntry entry = {
|
||||||
.id = *pId,
|
.id = *pId,
|
||||||
.status = (*pTask)->status.taskStatus,
|
.status = streamTaskGetStatus(*pTask, NULL),
|
||||||
.nodeId = pMeta->vgId,
|
.nodeId = pMeta->vgId,
|
||||||
.stage = pMeta->stage,
|
.stage = pMeta->stage,
|
||||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
|
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)),
|
||||||
|
|
|
@ -272,8 +272,12 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->hTaskInfo.id.taskId == 0);
|
ASSERT(pTask->hTaskInfo.id.taskId == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));
|
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
|
||||||
|
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p, pTask->status.schedStatus);
|
||||||
|
}
|
||||||
|
|
||||||
streamTaskEnablePause(pTask);
|
streamTaskEnablePause(pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -520,11 +524,13 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
||||||
int32_t taskLevel = pTask->info.taskLevel;
|
int32_t taskLevel = pTask->info.taskLevel;
|
||||||
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
|
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
|
||||||
|
|
||||||
char* p = NULL;
|
const char* id = pTask->id.idStr;
|
||||||
|
char* p = NULL;
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
||||||
|
|
||||||
if (status != TASK_STATUS__SCAN_HISTORY) {
|
if (status != TASK_STATUS__SCAN_HISTORY) {
|
||||||
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly",
|
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly",
|
||||||
pTask->id.idStr, p, pReq->upstreamTaskId);
|
id, p, pReq->upstreamTaskId);
|
||||||
|
|
||||||
void* pBuf = NULL;
|
void* pBuf = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -534,8 +540,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
||||||
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
|
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
|
||||||
|
|
||||||
tmsgSendRsp(&msg);
|
tmsgSendRsp(&msg);
|
||||||
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", pTask->id.idStr,
|
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id,
|
||||||
pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId);
|
taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -547,13 +553,15 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
||||||
|
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList);
|
int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
stDebug(
|
if (taskLevel == TASK_LEVEL__AGG) {
|
||||||
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
|
stDebug(
|
||||||
"rsp to all upstream tasks",
|
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing "
|
||||||
pTask->id.idStr, numOfTasks);
|
"and send rsp to all upstream tasks",
|
||||||
|
id, numOfTasks);
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
|
||||||
streamAggUpstreamScanHistoryFinish(pTask);
|
streamAggUpstreamScanHistoryFinish(pTask);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id,
|
||||||
|
numOfTasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
// all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data
|
// all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data
|
||||||
|
@ -564,14 +572,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
||||||
if (taskLevel == TASK_LEVEL__AGG) {
|
if (taskLevel == TASK_LEVEL__AGG) {
|
||||||
/*int32_t code = */streamTaskScanHistoryDataComplete(pTask);
|
/*int32_t code = */streamTaskScanHistoryDataComplete(pTask);
|
||||||
} else { // for sink task, set normal
|
} else { // for sink task, set normal
|
||||||
if (pTask->status.taskStatus != TASK_STATUS__PAUSE && pTask->status.taskStatus != TASK_STATUS__STOP &&
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
pTask->status.taskStatus != TASK_STATUS__DROPPING) {
|
|
||||||
streamSetStatusNormal(pTask);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
||||||
pTask->id.idStr, pReq->upstreamTaskId, pReq->childId, left);
|
id, pReq->upstreamTaskId, pReq->childId, left);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -582,7 +587,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
// execute in the scan history complete call back msg, ready to process data from inputQ
|
// execute in the scan history complete call back msg, ready to process data from inputQ
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_COMPLETED);
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
streamTaskSetSchedStatusInactive(pTask);
|
streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
@ -619,7 +624,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if downstream tasks have been ready
|
// check if downstream tasks have been ready
|
||||||
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCAN_HISTORY);
|
streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
||||||
|
@ -903,7 +908,7 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
|
||||||
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||||
SDataRange* pRange = &pTask->dataRange;
|
SDataRange* pRange = &pTask->dataRange;
|
||||||
|
|
||||||
if (pTask->hTaskInfo.id.taskId == 0) {
|
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
|
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
|
||||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||||
|
@ -949,7 +954,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
|
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
|
||||||
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
#if 0
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__DROPPING) {
|
if (status == TASK_STATUS__DROPPING) {
|
||||||
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
|
stDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
|
||||||
|
@ -1007,6 +1012,13 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
|
||||||
|
|
||||||
|
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
|
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
||||||
|
|
||||||
// in case of fill-history task, stop the tsdb file scan operation.
|
// in case of fill-history task, stop the tsdb file scan operation.
|
||||||
if (pTask->info.fillHistory == 1) {
|
if (pTask->info.fillHistory == 1) {
|
||||||
void* pExecutor = pTask->exec.pExecutor;
|
void* pExecutor = pTask->exec.pExecutor;
|
||||||
|
@ -1019,6 +1031,21 @@ void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
|
void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
|
char* p = NULL;
|
||||||
|
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
||||||
|
|
||||||
|
if (status == TASK_STATUS__PAUSE) {
|
||||||
|
streamTaskRestoreStatus(pTask);
|
||||||
|
|
||||||
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
|
stInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr,
|
||||||
|
p, num);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s status:%s not in pause status, no need to resume", pTask->id.idStr, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int8_t status = pTask->status.taskStatus;
|
int8_t status = pTask->status.taskStatus;
|
||||||
if (status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__PAUSE) {
|
||||||
pTask->status.taskStatus = pTask->status.keepTaskStatus;
|
pTask->status.taskStatus = pTask->status.keepTaskStatus;
|
||||||
|
@ -1031,6 +1058,8 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
stError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo fix race condition
|
// todo fix race condition
|
||||||
|
|
|
@ -304,11 +304,9 @@ static void freeUpstreamItem(void* p) {
|
||||||
|
|
||||||
void tFreeStreamTask(SStreamTask* pTask) {
|
void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
int32_t taskId = pTask->id.taskId;
|
int32_t taskId = pTask->id.taskId;
|
||||||
char* p = NULL;
|
|
||||||
streamTaskGetStatus(pTask, &p);
|
|
||||||
|
|
||||||
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
||||||
stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState, p);
|
stDebug("start to free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
|
||||||
|
|
||||||
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||||
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
||||||
|
@ -394,6 +392,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
pTask->pRspMsgList = NULL;
|
pTask->pRspMsgList = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
|
||||||
|
|
||||||
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
|
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
|
||||||
|
|
||||||
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
|
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
|
||||||
|
|
|
@ -14,39 +14,63 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
#include "streamsm.h"
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "tstream.h"
|
#include "tstream.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
#include "streamsm.h"
|
|
||||||
|
|
||||||
SStreamTaskState StreamTaskStatusList[8] = {
|
SStreamTaskState StreamTaskStatusList[9] = {
|
||||||
{.state = TASK_STATUS__NORMAL, .name = "normal"},
|
{.state = TASK_STATUS__NORMAL, .name = "normal"},
|
||||||
{.state = TASK_STATUS__DROPPING, .name = "dropping"},
|
{.state = TASK_STATUS__DROPPING, .name = "dropped"},
|
||||||
{.state = TASK_STATUS__UNINIT, .name = "uninit"},
|
{.state = TASK_STATUS__UNINIT, .name = "uninit"},
|
||||||
{.state = TASK_STATUS__STOP, .name = "stop"},
|
{.state = TASK_STATUS__STOP, .name = "stop"},
|
||||||
{.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"},
|
{.state = TASK_STATUS__SCAN_HISTORY, .name = "scan-history"},
|
||||||
{.state = TASK_STATUS__HALT, .name = "halt"},
|
{.state = TASK_STATUS__HALT, .name = "halt"},
|
||||||
{.state = TASK_STATUS__PAUSE, .name = "paused"},
|
{.state = TASK_STATUS__PAUSE, .name = "paused"},
|
||||||
{.state = TASK_STATUS__CK, .name = "checkpoint"},
|
{.state = TASK_STATUS__CK, .name = "checkpoint"},
|
||||||
|
{.state = TASK_STATUS_STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
|
||||||
};
|
};
|
||||||
|
|
||||||
SStreamEventInfo StreamTaskEventList[8] = {
|
SStreamEventInfo StreamTaskEventList[10] = {
|
||||||
{}, // dummy event, place holder
|
{}, // dummy event, place holder
|
||||||
{.event = TASK_EVENT_INIT, .name = "initialize"},
|
{.event = TASK_EVENT_INIT, .name = "initialize"},
|
||||||
{.event = TASK_EVENT_INIT_SCAN_HISTORY, .name = "scan-history-initialize"},
|
{.event = TASK_EVENT_INIT_SCANHIST, .name = "scan-history-initialize"},
|
||||||
{.event = TASK_EVENT_SCANHIST_COMPLETED, .name = "scan-history-completed"},
|
{.event = TASK_EVENT_SCANHIST_DONE, .name = "scan-history-completed"},
|
||||||
|
{.event = TASK_EVENT_STOP, .name = "stopping"},
|
||||||
|
{.event = TASK_EVENT_GEN_CHECKPOINT, .name = "checkpoint"},
|
||||||
|
{.event = TASK_EVENT_CHECKPOINT_DONE, .name = "checkpoint-done"},
|
||||||
|
{.event = TASK_EVENT_PAUSE, .name = "pausing"},
|
||||||
|
{.event = TASK_EVENT_RESUME, .name = "resuming"},
|
||||||
|
{.event = TASK_EVENT_HALT, .name = "halting"},
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
|
|
||||||
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
|
||||||
__state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn,
|
|
||||||
bool autoInvoke);
|
|
||||||
static int32_t streamTaskInitStatus(SStreamTask* pTask);
|
static int32_t streamTaskInitStatus(SStreamTask* pTask);
|
||||||
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
||||||
|
static int32_t initStateTransferTable(SStreamTaskSM* pSM);
|
||||||
|
|
||||||
|
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
||||||
|
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
||||||
|
SAttachedEventInfo* pEventInfo, bool autoInvoke);
|
||||||
|
|
||||||
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
|
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
|
||||||
|
|
||||||
|
static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
|
||||||
|
char* p = NULL;
|
||||||
|
streamTaskGetStatus(pTask, &p);
|
||||||
|
|
||||||
|
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
||||||
|
StreamTaskEventList[pEvtInfo->event].name, StreamTaskStatusList[pEvtInfo->status].name);
|
||||||
|
|
||||||
|
SStreamTaskSM* pSM = pTask->status.pSM;
|
||||||
|
if (pSM->eventList == NULL) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pSM->eventList, pEvtInfo);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
||||||
pTask->execInfo.init = taosGetTimestampMs();
|
pTask->execInfo.init = taosGetTimestampMs();
|
||||||
|
|
||||||
|
@ -57,49 +81,36 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
|
int32_t streamTaskSetReadyForWal(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
streamSetStatusNormal(pTask); // todo remove it
|
stDebug("s-task:%s ready for extract data from wal", pTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamSetStatusNormal(pTask); // todo remove it
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t streamTaskDoPause(SStreamTask* pTask) {
|
|
||||||
stDebug("s-task:%s start to pause tasks", pTask->id.idStr);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t streamTaskDoResume(SStreamTask* pTask) {
|
|
||||||
stDebug("s-task:%s start to resume tasks", pTask->id.idStr);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskWaitBeforeHalt(SStreamTask* pTask) {
|
|
||||||
char* p = NULL;
|
|
||||||
while (streamTaskGetStatus(pTask, &p) != TASK_STATUS__NORMAL) {
|
|
||||||
stDebug("related stream task:%s(status:%s) not ready for halt, wait for 100ms and retry", pTask->id.idStr, p);
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
||||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
||||||
|
|
||||||
pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
if (pTask->hTaskInfo.haltVer == -1) {
|
pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1;
|
if (pTask->hTaskInfo.haltVer == -1) {
|
||||||
|
pTask->hTaskInfo.haltVer = pTask->dataRange.range.maxVer + 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo optimize the perf of find the trans objs by using hash table
|
// todo optimize the perf of find the trans objs by using hash table
|
||||||
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) {
|
static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, const EStreamTaskEvent event) {
|
||||||
int32_t numOfTrans = taosArrayGetSize(pState->pTransList);
|
int32_t numOfTrans = taosArrayGetSize(pState->pTransList);
|
||||||
for(int32_t i = 0; i < numOfTrans; ++i) {
|
for (int32_t i = 0; i < numOfTrans; ++i) {
|
||||||
STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i);
|
STaskStateTrans* pTrans = taosArrayGet(pState->pTransList, i);
|
||||||
if (pTrans->state.state == pState->current.state && pTrans->event == event) {
|
if (pTrans->state.state == pState->current.state && pTrans->event == event) {
|
||||||
return pTrans;
|
return pTrans;
|
||||||
|
@ -115,6 +126,8 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
ASSERT(pSM->pActiveTrans == NULL);
|
ASSERT(pSM->pActiveTrans == NULL);
|
||||||
|
|
||||||
|
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
|
||||||
|
|
||||||
SStreamTaskState state = pSM->current;
|
SStreamTaskState state = pSM->current;
|
||||||
pSM->current = pSM->prev;
|
pSM->current = pSM->prev;
|
||||||
pSM->prev = state;
|
pSM->prev = state;
|
||||||
|
@ -125,46 +138,89 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
|
SStreamTaskSM* pSM = taosMemoryCalloc(1, sizeof(SStreamTaskSM));
|
||||||
if (pSM == NULL) {
|
if (pSM == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
|
||||||
|
tstrerror(terrno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSM->pTask = pTask;
|
pSM->pTask = pTask;
|
||||||
|
pSM->eventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
|
||||||
|
if (pSM->eventList == NULL) {
|
||||||
|
taosMemoryFree(pSM);
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
stError("s-task:%s failed to create task stateMachine, size:%d, code:%s", id, (int32_t)sizeof(SStreamTaskSM),
|
||||||
|
tstrerror(terrno));
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// set the initial state for the state-machine of stream task
|
// set the initial state for the state-machine of stream task
|
||||||
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
|
pSM->current = StreamTaskStatusList[TASK_STATUS__UNINIT];
|
||||||
|
|
||||||
pSM->startTs = taosGetTimestampMs();
|
pSM->startTs = taosGetTimestampMs();
|
||||||
int32_t code = initStateTransferTable(pSM);
|
int32_t code = initStateTransferTable(pSM);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
taosArrayDestroy(pSM->eventList);
|
||||||
taosMemoryFree(pSM);
|
taosMemoryFree(pSM);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
return pSM;
|
return pSM;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void* streamDestroyStateMachine(SStreamTaskSM* pSM) {
|
||||||
|
if (pSM == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pSM->eventList);
|
||||||
|
taosArrayDestroy(pSM->pTransList);
|
||||||
|
taosMemoryFree(pSM);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||||
|
taosThreadMutexLock(&pSM->pTask->lock);
|
||||||
|
|
||||||
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
|
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event);
|
||||||
stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name,
|
stDebug("s-task:%s start to handle event:%s, state:%s", pSM->pTask->id.idStr, StreamTaskEventList[event].name,
|
||||||
pSM->current.name);
|
pSM->current.name);
|
||||||
|
|
||||||
int32_t code = pTrans->preAction(pSM->pTask);
|
if (pTrans->attachEvent.event != 0) {
|
||||||
|
attachEvent(pSM->pTask, &pTrans->attachEvent);
|
||||||
|
taosThreadMutexUnlock(&pSM->pTask->lock);
|
||||||
|
|
||||||
taosThreadMutexLock(&pSM->pTask->lock);
|
while (1) {
|
||||||
ASSERT(pSM->pActiveTrans == NULL);
|
// wait for the task to be here
|
||||||
pSM->pActiveTrans = pTrans;
|
ETaskStatus s = streamTaskGetStatus(pSM->pTask, NULL);
|
||||||
pSM->startTs = taosGetTimestampMs();
|
if (s == pTrans->attachEvent.status) {
|
||||||
taosThreadMutexUnlock(&pSM->pTask->lock);
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {// this event has been handled already
|
||||||
|
stDebug("s-task:%s not handle event:%s yet, wait for 100ms and recheck", pSM->pTask->id.idStr,
|
||||||
|
StreamTaskEventList[event].name);
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
code = pTrans->pAction(pSM->pTask);
|
} else {
|
||||||
// todo handle error code;
|
ASSERT(pSM->pActiveTrans == NULL);
|
||||||
|
pSM->pActiveTrans = pTrans;
|
||||||
|
pSM->startTs = taosGetTimestampMs();
|
||||||
|
taosThreadMutexUnlock(&pSM->pTask->lock);
|
||||||
|
|
||||||
if (pTrans->autoInvokeEndFn) {
|
int32_t code = pTrans->pAction(pSM->pTask);
|
||||||
streamTaskOnHandleEventSuccess(pSM);
|
// todo handle error code;
|
||||||
|
|
||||||
|
if (pTrans->autoInvokeEndFn) {
|
||||||
|
streamTaskOnHandleEventSuccess(pSM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
|
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
|
||||||
|
@ -181,11 +237,42 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
|
||||||
|
|
||||||
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
||||||
pTrans->pSuccAction(pTask);
|
pTrans->pSuccAction(pTask);
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
if (taosArrayGetSize(pSM->eventList) > 0) {
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
|
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
|
||||||
|
|
||||||
|
SAttachedEventInfo* pEvtInfo = taosArrayPop(pSM->eventList);
|
||||||
|
|
||||||
|
// OK, let's handle the attached event, since the task has reached the required status now
|
||||||
|
if (pSM->current.state == pEvtInfo->status) {
|
||||||
|
stDebug("s-task:%s handle the attached event:%s, state:%s", pTask->id.idStr,
|
||||||
|
StreamTaskEventList[pEvtInfo->event].name, pSM->current.name);
|
||||||
|
|
||||||
|
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM, pEvtInfo->event);
|
||||||
|
ASSERT(pSM->pActiveTrans == NULL);
|
||||||
|
pSM->pActiveTrans = pNextTrans;
|
||||||
|
pSM->startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
int32_t code = pNextTrans->pAction(pSM->pTask);
|
||||||
|
|
||||||
|
if (pTrans->autoInvokeEndFn) {
|
||||||
|
return streamTaskOnHandleEventSuccess(pSM);
|
||||||
|
} else {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
|
StreamTaskEventList[pTrans->event].name, el, current.name, pSM->current.name);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,17 +284,19 @@ ETaskStatus streamTaskGetStatus(SStreamTask* pTask, char** pStr) {
|
||||||
return s.state;
|
return s.state;
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
||||||
__state_trans_fn preFn, __state_trans_fn fn, __state_trans_succ_fn succFn,
|
__state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) {
|
||||||
bool autoInvoke) {
|
|
||||||
STaskStateTrans trans = {0};
|
STaskStateTrans trans = {0};
|
||||||
trans.state = StreamTaskStatusList[current];
|
trans.state = StreamTaskStatusList[current];
|
||||||
trans.next = StreamTaskStatusList[next];
|
trans.next = StreamTaskStatusList[next];
|
||||||
trans.event = event;
|
trans.event = event;
|
||||||
|
|
||||||
trans.preAction = (preFn != NULL)? preFn:dummyFn;
|
if (pEventInfo != NULL) {
|
||||||
trans.pAction = (fn != NULL)? fn : dummyFn;
|
trans.attachEvent = *pEventInfo;
|
||||||
trans.pSuccAction = (succFn != NULL)? succFn:dummyFn;
|
}
|
||||||
|
|
||||||
|
trans.pAction = (fn != NULL) ? fn : dummyFn;
|
||||||
|
trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
|
||||||
trans.autoInvokeEndFn = autoInvoke;
|
trans.autoInvokeEndFn = autoInvoke;
|
||||||
return trans;
|
return trans;
|
||||||
}
|
}
|
||||||
|
@ -221,49 +310,83 @@ int32_t initStateTransferTable(SStreamTaskSM* pSM) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// initialization event handle
|
// initialization event handle
|
||||||
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT, NULL,
|
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__NORMAL, TASK_EVENT_INIT,
|
||||||
streamTaskInitStatus, onNormalTaskReady, false);
|
streamTaskInitStatus, onNormalTaskReady, false, false);
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCAN_HISTORY, NULL,
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST,
|
||||||
streamTaskInitStatus, onScanhistoryTaskReady, false);
|
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_COMPLETED, NULL,
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS_STREAM_SCAN_HISTORY, TASK_EVENT_INIT_STREAM_SCANHIST,
|
||||||
streamTaskSetReadyForWal, NULL, true);
|
streamTaskInitStatus, onScanhistoryTaskReady, false, false);
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
// pause & resume related event handle
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE,
|
||||||
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, streamTaskDoPause, NULL,
|
streamTaskSetReadyForWal, NULL, NULL, true);
|
||||||
true);
|
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__NORMAL, TASK_EVENT_RESUME, NULL, streamTaskDoResume,
|
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__NORMAL, TASK_EVENT_SCANHIST_DONE,
|
||||||
NULL, true);
|
streamTaskSetReadyForWal, NULL, NULL, true);
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
// halt stream task, from other task status
|
||||||
|
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
||||||
|
streamTaskKeepCurrentVerInWal, NULL, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
SAttachedEventInfo info = {.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_HALT};
|
||||||
|
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
||||||
|
streamTaskKeepCurrentVerInWal, &info, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal,
|
||||||
|
&info, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL,
|
||||||
|
streamTaskKeepCurrentVerInWal, NULL, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
// checkpoint related event
|
||||||
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
|
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL,
|
||||||
streamTaskDoCheckpoint, NULL, true);
|
streamTaskDoCheckpoint, NULL, true);
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
trans =
|
||||||
|
createStateTransform(TASK_STATUS__CK, TASK_STATUS__NORMAL, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
// halt stream task, from other task status
|
// pause & resume related event handle
|
||||||
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL,
|
trans = createStateTransform(TASK_STATUS__NORMAL, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||||
streamTaskKeepCurrentVerInWal, true);
|
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt,
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||||
NULL, streamTaskKeepCurrentVerInWal, true);
|
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, streamTaskWaitBeforeHalt, NULL,
|
info = (SAttachedEventInfo){.status = TASK_STATUS__NORMAL, .event = TASK_EVENT_PAUSE};
|
||||||
streamTaskKeepCurrentVerInWal, true);
|
trans = createStateTransform(TASK_STATUS_STREAM_SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, NULL,
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||||
streamTaskKeepCurrentVerInWal, true);
|
|
||||||
taosArrayPush(pSM->pTransList, &trans);
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
||||||
|
taosArrayPush(pSM->pTransList, &trans);
|
||||||
|
|
||||||
|
// resume is completed by restore status of state-machine
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue