fix(stream): free task state when stopping stream tasks.

This commit is contained in:
Haojun Liao 2024-07-08 18:20:35 +08:00
parent 50a2ef08bd
commit bdced636b3
3 changed files with 27 additions and 16 deletions

View File

@ -529,10 +529,11 @@ typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5); SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
void tFreeStreamTask(SStreamTask* pTask);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status);
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId);

View File

@ -268,13 +268,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
streamTaskCleanupCheckInfo(&pTask->taskCheckInfo); streamTaskCleanupCheckInfo(&pTask->taskCheckInfo);
streamFreeTaskState(pTask, status1);
if (pTask->pState) {
stDebug("s-task:0x%x start to free task state", taskId);
streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING);
taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL;
}
if (pTask->pNameMap) { if (pTask->pNameMap) {
tSimpleHashCleanup(pTask->pNameMap); tSimpleHashCleanup(pTask->pNameMap);
@ -311,6 +305,16 @@ void tFreeStreamTask(SStreamTask* pTask) {
stDebug("s-task:0x%x free task completed", taskId); stDebug("s-task:0x%x free task completed", taskId);
} }
void streamFreeTaskState(SStreamTask* pTask, ETaskStatus status) {
if (pTask->pState != NULL) {
stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
taskDbRemoveRef(pTask->pBackend);
pTask->pBackend = NULL;
pTask->pState = NULL;
}
}
static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
SDataRange* pRange = &pTask->dataRange; SDataRange* pRange = &pTask->dataRange;

View File

@ -79,6 +79,12 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
return 0; return 0;
} }
static int32_t stopTaskSuccFn(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
streamFreeTaskState(pTask, pSM->current.state);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskInitStatus(SStreamTask* pTask) { int32_t streamTaskInitStatus(SStreamTask* pTask) {
pTask->execInfo.checkTs = taosGetTimestampMs(); pTask->execInfo.checkTs = taosGetTimestampMs();
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
@ -634,21 +640,21 @@ void doInitStateTransferTable(void) {
// resume is completed by restore status of state-machine // resume is completed by restore status of state-machine
// stop related event // stop related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
taosArrayPush(streamTaskSMTrans, &trans); taosArrayPush(streamTaskSMTrans, &trans);
// dropping related event // dropping related event