fix(stream): adjust the time to free task backend.

This commit is contained in:
Haojun Liao 2024-07-15 19:19:58 +08:00
parent 14a7cebc56
commit a46b7b3a41
3 changed files with 16 additions and 15 deletions

View File

@ -1163,7 +1163,10 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
taosThreadMutexLock(&pTask->lock);
ETaskStatus p = streamTaskGetStatus(pTask)->state;
if (pTask->status.sendConsensusChkptId == true) {
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
taosThreadMutexUnlock(&pTask->lock);
@ -1174,6 +1177,11 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
taosThreadMutexUnlock(&pTask->lock);
if (pTask->pBackend != NULL) {
streamFreeTaskState(pTask, p);
pTask->pBackend = NULL;
}
ASSERT(pTask->pBackend == NULL);
pTask->status.requireConsensusChkptId = true;

View File

@ -1265,7 +1265,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
}
// negotiate the consensus checkpoint id for current task
ASSERT(pTask->pBackend == NULL);
code = streamTaskSendRestoreChkptMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already?

View File

@ -79,12 +79,6 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
return 0;
}
static int32_t stopTaskSuccFn(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
streamFreeTaskState(pTask, pSM->current.state);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskInitStatus(SStreamTask* pTask) {
pTask->execInfo.checkTs = taosGetTimestampMs();
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
@ -640,21 +634,21 @@ void doInitStateTransferTable(void) {
// resume is completed by restore status of state-machine
// stop related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// dropping related event