diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e275c1511d..f867a82cbb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -529,10 +529,11 @@ typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); +void tFreeStreamTask(SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); -void tFreeStreamTask(SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); +void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f72a5dd434..9bcad87264 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -268,13 +268,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); - - if (pTask->pState) { - stDebug("s-task:0x%x start to free task state", taskId); - streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING); - taskDbRemoveRef(pTask->pBackend); - pTask->pBackend = NULL; - } + streamFreeTaskState(pTask, status1); if (pTask->pNameMap) { tSimpleHashCleanup(pTask->pNameMap); @@ -311,6 +305,16 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } +void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) { + if (pTask->pState != NULL) { + stDebug("s-task:0x%x start to free task state", pTask->id.taskId); + streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); + taskDbRemoveRef(pTask->pBackend); + pTask->pBackend = NULL; + pTask->pState = NULL; + } +} + static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { SCheckpointInfo* pChkInfo = &pTask->chkInfo; SDataRange* pRange = &pTask->dataRange; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 85d3e0068a..f2bd99cdaf 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -79,6 +79,12 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv return 0; } +static int32_t stopTaskSuccFn(SStreamTask* pTask) { + SStreamTaskSM* pSM = pTask->status.pSM; + streamFreeTaskState(pTask, pSM->current.state); + return TSDB_CODE_SUCCESS; +} + int32_t streamTaskInitStatus(SStreamTask* pTask) { pTask->execInfo.checkTs = taosGetTimestampMs(); stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, @@ -634,21 +640,21 @@ void doInitStateTransferTable(void) { // resume is completed by restore status of state-machine // stop related event - trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); - trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); + trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); taosArrayPush(streamTaskSMTrans, &trans); // dropping related event