From 0d4892f25b01c8a8ffed6b6b00917994ae846612 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 20 Oct 2023 16:27:39 +0800 Subject: [PATCH] fix(stream): fix bugs caused by refactor sm. --- source/libs/stream/src/streamCheckpoint.c | 13 ++++++++++--- source/libs/stream/src/streamTask.c | 2 +- source/libs/stream/src/streamTaskSm.c | 15 ++++++++++++--- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7026ac7119..28b67029ce 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -276,6 +276,8 @@ void streamTaskClearCheckInfo(SStreamTask* pTask) { } int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { + int32_t vgId = pMeta->vgId; + taosWLockLatch(&pMeta->lock); for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { @@ -297,10 +299,15 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { char* str = NULL; streamTaskGetStatus(p, &str); - streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - // save the task - streamMetaSaveTask(pMeta, p); + int32_t code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + if (code != TSDB_CODE_SUCCESS) { + stDebug("s-task:%s vgId:%d save task status failed, since handle event failed", p->id.idStr, vgId); + taosWUnLockLatch(&pMeta->lock); + return -1; + } else { // save the task + streamMetaSaveTask(pMeta, p); + } stDebug( "vgId:%d s-task:%s level:%d open upstream inputQ, commit task status after checkpoint completed, " diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2f14f38fdc..ec9715c068 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -318,7 +318,7 @@ void tFreeStreamTask(SStreamTask* pTask) { // remove the ref by timer while (pTask->status.timerActive > 0) { stDebug("s-task:%s wait for task stop timer activities, ref:%d", pTask->id.idStr, pTask->status.timerActive); - taosMsleep(10); + taosMsleep(100); } if (pTask->schedInfo.pTimer != NULL) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 3e148c0a35..f42e6bbb3b 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -115,7 +115,11 @@ static STaskStateTrans* streamTaskFindTransform(const SStreamTaskSM* pState, con } } - ASSERT(0); + if (event == TASK_EVENT_CHECKPOINT_DONE && pState->current.state == TASK_STATUS__STOP) { + + } else { + ASSERT(0); + } return NULL; } @@ -183,8 +187,13 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { taosThreadMutexLock(&pTask->lock); STaskStateTrans* pTrans = streamTaskFindTransform(pSM, event); - stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name, - pSM->current.name); + if (pTrans == NULL) { + stWarn("s-task:%s status:%s not allowed handle event:%s", pTask->id.idStr, pSM->current.name, StreamTaskEventList[event].name); + return -1; + } else { + stDebug("s-task:%s start to handle event:%s, state:%s", pTask->id.idStr, StreamTaskEventList[event].name, + pSM->current.name); + } if (pTrans->attachEvent.event != 0) { attachEvent(pTask, &pTrans->attachEvent);