fix(stream): fix error in pause/resume procedure.

This commit is contained in:
Haojun Liao 2024-07-02 13:33:31 +08:00
parent 2c86e3940f
commit 4e56b6f9eb
2 changed files with 25 additions and 15 deletions

View File

@ -1031,12 +1031,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
ETaskStatus status = streamTaskGetStatus(pTask)->state;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SINK) {
ASSERT(status != TASK_STATUS__UNINIT);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
// no lock needs to secure the access of the version
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
@ -1058,9 +1052,9 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else {
streamTrySchedExec(pTask);
}
} else {
} /*else {
ASSERT(status != TASK_STATUS__UNINIT);
}
}*/
streamMetaReleaseTask(pMeta, pTask);
return 0;
@ -1070,16 +1064,32 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (pTask == NULL) {
tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState->name);
taosThreadMutexUnlock(&pTask->lock);
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) {
return code;
}
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHistoryTask = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHistoryTask) {
code = tqProcessTaskResumeImpl(handle, pHistoryTask, sversion, pReq->igUntreated, fromVnode);
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
if (pHTask) {
taosThreadMutexLock(&pHTask->lock);
SStreamTaskState* p = streamTaskGetStatus(pHTask);
tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p->name);
taosThreadMutexUnlock(&pHTask->lock);
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
}
return code;

View File

@ -206,15 +206,15 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
if (pInfo->event == event) {
taosArrayRemove(pSM->pWaitingEventList, i);
stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d",
pTask->id.idStr, pInfo->event);
stDebug("s-task:%s %s event in waiting list not be handled yet, remove it from waiting list, remaining events:%d",
pTask->id.idStr, GET_EVT_NAME(pInfo->event), num - 1);
removed = true;
break;
}
}
if (!removed) {
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name);
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, GET_EVT_NAME(event));
}
return TSDB_CODE_SUCCESS;