refactor: do some internal refactor.
This commit is contained in:
parent
60977d56bd
commit
f14c3b93c1
|
@ -160,6 +160,45 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, SStreamTask* pTask) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
||||||
|
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
||||||
|
pEventName, el, pSM->prev.state.name, pSM->current.name);
|
||||||
|
|
||||||
|
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
||||||
|
|
||||||
|
// OK, let's handle the attached event, since the task has reached the required status now
|
||||||
|
if (pSM->current.state == pEvtInfo->status) {
|
||||||
|
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
||||||
|
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
||||||
|
|
||||||
|
// remove it
|
||||||
|
taosArrayPop(pSM->pWaitingEventList);
|
||||||
|
|
||||||
|
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
|
||||||
|
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
|
||||||
|
|
||||||
|
pSM->pActiveTrans = pNextTrans;
|
||||||
|
pSM->startTs = taosGetTimestampMs();
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
code = pNextTrans->pAction(pSM->pTask);
|
||||||
|
if (pNextTrans->autoInvokeEndFn) {
|
||||||
|
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
|
||||||
|
} else {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
|
||||||
|
pSM->current.name, GET_EVT_NAME(pEvtInfo->event),
|
||||||
|
StreamTaskStatusList[pEvtInfo->status].name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void streamTaskRestoreStatus(SStreamTask* pTask) {
|
void streamTaskRestoreStatus(SStreamTask* pTask) {
|
||||||
SStreamTaskSM* pSM = pTask->status.pSM;
|
SStreamTaskSM* pSM = pTask->status.pSM;
|
||||||
|
|
||||||
|
@ -175,7 +214,13 @@ void streamTaskRestoreStatus(SStreamTask* pTask) {
|
||||||
pSM->prev.evt = 0;
|
pSM->prev.evt = 0;
|
||||||
|
|
||||||
pSM->startTs = taosGetTimestampMs();
|
pSM->startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
||||||
|
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
||||||
|
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
|
||||||
|
} else {
|
||||||
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
@ -343,39 +388,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
||||||
pTrans->pSuccAction(pTask);
|
pTrans->pSuccAction(pTask);
|
||||||
|
|
||||||
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
|
||||||
int64_t el = (taosGetTimestampMs() - pSM->startTs);
|
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
|
||||||
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
|
|
||||||
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
|
|
||||||
|
|
||||||
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
|
|
||||||
|
|
||||||
// OK, let's handle the attached event, since the task has reached the required status now
|
|
||||||
if (pSM->current.state == pEvtInfo->status) {
|
|
||||||
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
|
|
||||||
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
|
|
||||||
|
|
||||||
// remove it
|
|
||||||
taosArrayPop(pSM->pWaitingEventList);
|
|
||||||
|
|
||||||
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
|
|
||||||
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
|
|
||||||
|
|
||||||
pSM->pActiveTrans = pNextTrans;
|
|
||||||
pSM->startTs = taosGetTimestampMs();
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
int32_t code = pNextTrans->pAction(pSM->pTask);
|
|
||||||
if (pNextTrans->autoInvokeEndFn) {
|
|
||||||
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
|
|
||||||
} else {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
stDebug("s-task:%s state:%s event:%s in waiting list, req state:%s not fulfilled, put it back", pTask->id.idStr,
|
|
||||||
pSM->current.name, GET_EVT_NAME(pEvtInfo->event),
|
|
||||||
StreamTaskStatusList[pEvtInfo->status].name);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue