|
|
|
@ -59,20 +59,23 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask);
|
|
|
|
|
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
|
|
|
|
static int32_t initStateTransferTable();
|
|
|
|
|
static void doInitStateTransferTable(void);
|
|
|
|
|
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask);
|
|
|
|
|
|
|
|
|
|
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
|
|
|
|
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
|
|
|
|
SAttachedEventInfo* pEventInfo, bool autoInvoke);
|
|
|
|
|
SFutureHandleEventInfo* pEventInfo);
|
|
|
|
|
|
|
|
|
|
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
|
|
|
|
|
|
|
|
|
|
static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
|
|
|
|
|
static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
|
|
|
|
|
char* p = streamTaskGetStatus(pTask)->name;
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
|
|
|
|
|
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
|
|
|
|
|
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
|
|
|
|
|
|
|
|
|
|
SArray* pList = pTask->status.pSM->pWaitingEventList;
|
|
|
|
|
taosArrayPush(pList, pEvtInfo);
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pList));
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -85,18 +88,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
|
|
|
|
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) {
|
|
|
|
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
|
|
|
streamTaskSendCheckpointSourceRsp(pTask);
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
|
|
|
|
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
|
|
|
stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
|
|
|
|
@ -170,9 +161,11 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
|
|
|
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
|
|
|
|
pEventName, el, pSM->prev.state.name, pSM->current.name);
|
|
|
|
|
|
|
|
|
|
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
|
|
|
|
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
|
|
|
|
|
|
|
|
|
|
// OK, let's handle the attached event, since the task has reached the required status now
|
|
|
|
|
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
|
|
|
|
|
|
|
|
|
// OK, let's handle the waiting event, since the task has reached the required status now
|
|
|
|
|
if (pSM->current.state == pEvtInfo->status) {
|
|
|
|
|
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
|
|
|
|
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
|
|
|
@ -189,7 +182,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
|
|
|
|
|
|
|
|
|
code = pNextTrans->pAction(pSM->pTask);
|
|
|
|
|
if (pNextTrans->autoInvokeEndFn) {
|
|
|
|
|
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
|
|
|
|
|
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event, pEvtInfo->callBackFn, pEvtInfo->pParam);
|
|
|
|
|
} else {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
@ -203,30 +196,61 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void streamTaskRestoreStatus(SStreamTask* pTask) {
|
|
|
|
|
static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) {
|
|
|
|
|
SStreamTaskSM* pSM = pTask->status.pSM;
|
|
|
|
|
|
|
|
|
|
bool removed = false;
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
ASSERT(pSM->pActiveTrans == NULL);
|
|
|
|
|
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
|
|
|
|
|
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
|
|
|
|
|
for (int32_t i = 0; i < num; ++i) {
|
|
|
|
|
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
|
|
|
|
|
if (pInfo->event == event) {
|
|
|
|
|
taosArrayRemove(pSM->pWaitingEventList, i);
|
|
|
|
|
stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d",
|
|
|
|
|
pTask->id.idStr, pInfo->event);
|
|
|
|
|
removed = true;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SStreamTaskState state = pSM->current;
|
|
|
|
|
pSM->current = pSM->prev.state;
|
|
|
|
|
|
|
|
|
|
pSM->prev.state = state;
|
|
|
|
|
pSM->prev.evt = 0;
|
|
|
|
|
|
|
|
|
|
pSM->startTs = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
|
|
|
|
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
|
|
|
|
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
|
|
|
|
|
} else {
|
|
|
|
|
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
|
|
|
|
if (!removed) {
|
|
|
|
|
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
|
|
|
|
|
SStreamTaskSM* pSM = pTask->status.pSM;
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) {
|
|
|
|
|
SStreamTaskState state = pSM->current;
|
|
|
|
|
pSM->current = pSM->prev.state;
|
|
|
|
|
|
|
|
|
|
pSM->prev.state = state;
|
|
|
|
|
pSM->prev.evt = 0;
|
|
|
|
|
|
|
|
|
|
pSM->startTs = taosGetTimestampMs();
|
|
|
|
|
|
|
|
|
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
|
|
|
|
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr,
|
|
|
|
|
pSM->prev.state.name, pSM->current.name);
|
|
|
|
|
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
|
|
|
|
|
} else {
|
|
|
|
|
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
removeEventInWaitingList(pTask, TASK_EVENT_PAUSE);
|
|
|
|
|
code = -1; // failed to restore the status
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
|
|
|
@ -242,7 +266,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pSM->pTask = pTask;
|
|
|
|
|
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
|
|
|
|
|
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SFutureHandleEventInfo));
|
|
|
|
|
if (pSM->pWaitingEventList == NULL) {
|
|
|
|
|
taosMemoryFree(pSM);
|
|
|
|
|
|
|
|
|
@ -273,7 +297,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|
|
|
|
const char* id = pTask->id.idStr;
|
|
|
|
|
|
|
|
|
|
if (pTrans->attachEvent.event != 0) {
|
|
|
|
|
attachEvent(pTask, &pTrans->attachEvent);
|
|
|
|
|
attachWaitedEvent(pTask, &pTrans->attachEvent);
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
@ -303,7 +327,32 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|
|
|
|
// todo handle error code;
|
|
|
|
|
|
|
|
|
|
if (pTrans->autoInvokeEndFn) {
|
|
|
|
|
streamTaskOnHandleEventSuccess(pSM, event);
|
|
|
|
|
streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) {
|
|
|
|
|
SStreamTask* pTask = pSM->pTask;
|
|
|
|
|
if (pTrans->attachEvent.event != 0) {
|
|
|
|
|
SFutureHandleEventInfo info = pTrans->attachEvent;
|
|
|
|
|
info.pParam = param;
|
|
|
|
|
info.callBackFn = callbackFn;
|
|
|
|
|
|
|
|
|
|
attachWaitedEvent(pTask, &info);
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
} else { // override current active trans
|
|
|
|
|
pSM->pActiveTrans = pTrans;
|
|
|
|
|
pSM->startTs = taosGetTimestampMs();
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
int32_t code = pTrans->pAction(pTask);
|
|
|
|
|
// todo handle error code;
|
|
|
|
|
|
|
|
|
|
if (pTrans->autoInvokeEndFn) {
|
|
|
|
|
streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -349,6 +398,45 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
|
|
|
|
|
int32_t code = TSDB_CODE_SUCCESS;
|
|
|
|
|
SStreamTask* pTask = pSM->pTask;
|
|
|
|
|
STaskStateTrans* pTrans = NULL;
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
|
|
|
|
|
EStreamTaskEvent evt = pSM->pActiveTrans->event;
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
|
|
|
|
|
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
|
|
|
|
|
taosMsleep(100);
|
|
|
|
|
} else {
|
|
|
|
|
// no active event trans exists, handle this event directly
|
|
|
|
|
pTrans = streamTaskFindTransform(pSM->current.state, event);
|
|
|
|
|
if (pTrans == NULL) {
|
|
|
|
|
stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name);
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pSM->pActiveTrans != NULL) {
|
|
|
|
|
// currently in some state transfer procedure, not auto invoke transfer, quit from this procedure
|
|
|
|
|
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
|
|
|
|
|
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
|
|
|
|
|
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
code = doHandleEventAsync(pSM, event, pTrans, callbackFn, param);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void keepPrevInfo(SStreamTaskSM* pSM) {
|
|
|
|
|
STaskStateTrans* pTrans = pSM->pActiveTrans;
|
|
|
|
|
|
|
|
|
@ -356,8 +444,9 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
|
|
|
|
|
pSM->prev.evt = pTrans->event;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
|
|
|
|
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
|
|
|
|
|
SStreamTask* pTask = pSM->pTask;
|
|
|
|
|
const char* id = pTask->id.idStr;
|
|
|
|
|
|
|
|
|
|
// do update the task status
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
@ -369,16 +458,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|
|
|
|
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
|
|
|
|
|
|
|
|
|
|
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
|
|
|
|
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
|
|
|
|
|
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
|
|
|
|
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
|
|
|
|
|
pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
|
|
|
|
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pTrans->event != event) {
|
|
|
|
|
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
|
|
|
|
|
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
|
|
|
|
|
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event),
|
|
|
|
|
pSM->current.name, GET_EVT_NAME(pTrans->event));
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
|
|
|
|
}
|
|
|
|
@ -388,16 +477,31 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|
|
|
|
pSM->current = pTrans->next;
|
|
|
|
|
pSM->pActiveTrans = NULL;
|
|
|
|
|
|
|
|
|
|
// todo remove it
|
|
|
|
|
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
|
|
|
|
pTrans->pSuccAction(pTask);
|
|
|
|
|
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
// todo: add parameter to control lock
|
|
|
|
|
// after handling the callback function assigned by invoker, go on handling the waiting tasks
|
|
|
|
|
if (callbackFn != NULL) {
|
|
|
|
|
stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event));
|
|
|
|
|
callbackFn(pSM->pTask, param);
|
|
|
|
|
|
|
|
|
|
stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosThreadMutexLock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
// tasks in waiting list
|
|
|
|
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
|
|
|
|
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
|
|
|
|
|
} else {
|
|
|
|
|
taosThreadMutexUnlock(&pTask->lock);
|
|
|
|
|
|
|
|
|
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
|
|
|
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
|
|
|
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id,
|
|
|
|
|
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -453,7 +557,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
|
|
|
|
|
__state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) {
|
|
|
|
|
__state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo) {
|
|
|
|
|
STaskStateTrans trans = {0};
|
|
|
|
|
trans.state = StreamTaskStatusList[current];
|
|
|
|
|
trans.next = StreamTaskStatusList[next];
|
|
|
|
@ -468,7 +572,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr
|
|
|
|
|
|
|
|
|
|
trans.pAction = (fn != NULL) ? fn : dummyFn;
|
|
|
|
|
trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
|
|
|
|
|
trans.autoInvokeEndFn = autoInvoke;
|
|
|
|
|
trans.autoInvokeEndFn = (fn == NULL);
|
|
|
|
|
return trans;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -482,93 +586,93 @@ void doInitStateTransferTable(void) {
|
|
|
|
|
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
|
|
|
|
|
|
|
|
|
|
// initialization event handle
|
|
|
|
|
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
|
|
|
|
|
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
// scan-history related event
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
// halt stream task, from other task status
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
|
|
|
|
SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
|
|
|
|
|
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
// checkpoint related event
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
// pause & resume related event handle
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
|
|
|
|
info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
// resume is completed by restore status of state-machine
|
|
|
|
|
|
|
|
|
|
// stop related event
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
|
|
|
|
|
// dropping related event
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true);
|
|
|
|
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
|
|
|
|
|
taosArrayPush(streamTaskSMTrans, &trans);
|
|
|
|
|
}
|
|
|
|
|
//clang-format on
|