fix(stream): add event check when handling the end of event.
This commit is contained in:
parent
f708ddb792
commit
19c5882cbe
|
@ -746,7 +746,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
|
|||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM);
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
|
||||
void streamTaskRestoreStatus(SStreamTask* pTask);
|
||||
|
||||
int32_t streamTaskStop(SStreamTask* pTask);
|
||||
|
|
|
@ -163,7 +163,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
}
|
||||
} else {
|
||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM);
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -313,7 +313,7 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
|
|||
|
||||
// todo: refactor this function.
|
||||
static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM);
|
||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT);
|
||||
|
||||
#if 0
|
||||
const char* id = pTask->id.idStr;
|
||||
|
|
|
@ -214,7 +214,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
|
|||
// todo handle error code;
|
||||
|
||||
if (pTrans->autoInvokeEndFn) {
|
||||
streamTaskOnHandleEventSuccess(pSM);
|
||||
streamTaskOnHandleEventSuccess(pSM, event);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,7 +262,7 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
|
|||
pSM->prev.evt = pTrans->event;
|
||||
}
|
||||
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) {
|
||||
SStreamTask* pTask = pSM->pTask;
|
||||
|
||||
// do update the task status
|
||||
|
@ -280,6 +280,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
|
|||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (pTrans->event != event) {
|
||||
stWarn("s-task:%s handle event:%s failed, current status:%s", pTask->id.idStr, StreamTaskEventList[event].name,
|
||||
pSM->current.name);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
keepPrevInfo(pSM);
|
||||
|
||||
pSM->current = pTrans->next;
|
||||
|
@ -309,7 +316,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
|
|||
|
||||
int32_t code = pNextTrans->pAction(pSM->pTask);
|
||||
if (pNextTrans->autoInvokeEndFn) {
|
||||
return streamTaskOnHandleEventSuccess(pSM);
|
||||
return streamTaskOnHandleEventSuccess(pSM, event);
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue