From a46b7b3a414dacc3417934c92f81d00a7965d414 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jul 2024 19:19:58 +0800 Subject: [PATCH] fix(stream): adjust the time to free task backend. --- source/libs/stream/src/streamCheckpoint.c | 8 ++++++++ source/libs/stream/src/streamMeta.c | 1 - source/libs/stream/src/streamTaskSm.c | 22 ++++++++-------------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 87c2af5207..d1ea72370d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -1163,7 +1163,10 @@ int32_t deleteCheckpointFile(const char* id, const char* name) { int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { const char* id = pTask->id.idStr; + taosThreadMutexLock(&pTask->lock); + ETaskStatus p = streamTaskGetStatus(pTask)->state; + if (pTask->status.sendConsensusChkptId == true) { stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); taosThreadMutexUnlock(&pTask->lock); @@ -1174,6 +1177,11 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) { taosThreadMutexUnlock(&pTask->lock); + if (pTask->pBackend != NULL) { + streamFreeTaskState(pTask, p); + pTask->pBackend = NULL; + } + ASSERT(pTask->pBackend == NULL); pTask->status.requireConsensusChkptId = true; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d0b1f6ca93..d2c957422b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1265,7 +1265,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { } // negotiate the consensus checkpoint id for current task - ASSERT(pTask->pBackend == NULL); code = streamTaskSendRestoreChkptMsg(pTask); // this task may has no checkpoint, but others tasks may generate checkpoint already? diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index f2bd99cdaf..85d3e0068a 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -79,12 +79,6 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv return 0; } -static int32_t stopTaskSuccFn(SStreamTask* pTask) { - SStreamTaskSM* pSM = pTask->status.pSM; - streamFreeTaskState(pTask, pSM->current.state); - return TSDB_CODE_SUCCESS; -} - int32_t streamTaskInitStatus(SStreamTask* pTask) { pTask->execInfo.checkTs = taosGetTimestampMs(); stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, @@ -640,21 +634,21 @@ void doInitStateTransferTable(void) { // resume is completed by restore status of state-machine // stop related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); taosArrayPush(streamTaskSMTrans, &trans); // dropping related event