enh(stream): sm for stream tasks.

This commit is contained in:
Haojun Liao 2023-10-17 13:47:55 +08:00
parent 7b7ce10475
commit 64ed7e49a2
2 changed files with 19 additions and 5 deletions

View File

@ -55,6 +55,7 @@ struct SStreamTaskSM {
SArray* pTransList; // SArray<STaskStateTrans>
int64_t stateTs;
SStreamTask* pTask;
STaskStateTrans* pActiveTrans;
};
typedef struct SStreamEventInfo {

View File

@ -38,6 +38,7 @@ static int32_t initStateTransferTable(SStreamTaskSM* pSM);
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return 0; }
static int32_t streamTaskStartCheckDownstream(SStreamTask* pTask) {
stDebug("s-task:%s start to check downstream tasks", pTask->id.idStr);
streamTaskCheckDownstream(pTask);
return 0;
}
static int32_t streamTaskDoPause(SStreamTask* pTask) {
@ -84,21 +85,33 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
taosMemoryFree(pSM);
return NULL;
}
return pSM;
}
int32_t taskSMHandleEvent(SStreamTaskSM* pSM, const EStreamTaskEvent* pEvent) {
STaskStateTrans* pTrans = streamTaskFindTransform(pSM, pEvent);
qDebug("start to handle event:%d", *pEvent);
ASSERT(pSM->pActiveTrans == NULL);
pSM->current = pTrans->next;
stDebug("start to handle event:%d, state:%s", *pEvent, pSM->current.name);
pSM->pActiveTrans = pTrans;
pSM->stateTs = taosGetTimestampMs();
qDebug("new state:%s from %s", pTrans->next.name, pSM->current.name);
return pTrans->pAction(pSM->pTask);
}
int32_t taskSMOnHandleEventSuccess(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans;
EStreamTaskEvent* pEvent = &pTrans->event;
int64_t el = (taosGetTimestampMs() - pSM->stateTs);
stDebug("handle event:%d completed, elapsd time:%" PRId64 "ms new state:%s from %s", *pEvent, el, pTrans->next.name,
pSM->current.name);
// todo: add lock
pSM->current = pTrans->next;
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn) {
STaskStateTrans trans = {0};
trans.state = StreamTaskStatusList[current];