fix(stream): add event check when handling the end of event.

This commit is contained in:
Haojun Liao 2023-10-30 10:22:24 +08:00
parent 6e118c7963
commit 86dfc52a6e
3 changed files with 13 additions and 6 deletions

View File

@ -746,7 +746,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamTaskStop(SStreamTask* pTask);

View File

@ -163,7 +163,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
}
} else {
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskOnHandleEventSuccess(pTask->status.pSM);
streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT);
}
return 0;
@ -313,7 +313,7 @@ int32_t onScanhistoryTaskReady(SStreamTask* pTask) {
// todo: refactor this function.
static void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
streamTaskOnHandleEventSuccess(pTask->status.pSM);
streamTaskOnHandleEventSuccess(pTask->status.pSM, TASK_EVENT_INIT);
#if 0
const char* id = pTask->id.idStr;

View File

@ -213,7 +213,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
// todo handle error code;
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM);
streamTaskOnHandleEventSuccess(pSM, event);
}
}
@ -261,7 +261,7 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
pSM->prev.evt = pTrans->event;
}
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) {
SStreamTask* pTask = pSM->pTask;
// do update the task status
@ -279,6 +279,13 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
return TSDB_CODE_INVALID_PARA;
}
if (pTrans->event != event) {
stWarn("s-task:%s handle event:%s failed, current status:%s", pTask->id.idStr, StreamTaskEventList[event].name,
pSM->current.name);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
}
keepPrevInfo(pSM);
pSM->current = pTrans->next;
@ -308,7 +315,7 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM) {
int32_t code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM);
return streamTaskOnHandleEventSuccess(pSM, event);
} else {
return code;
}