From 4e56b6f9ebf5965b63fe49a68545b79a3be29a92 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Jul 2024 13:33:31 +0800 Subject: [PATCH] fix(stream): fix error in pause/resume procedure. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 34 ++++++++++++++-------- source/libs/stream/src/streamTaskSm.c | 6 ++-- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index dabd4ff455..1130ba8c05 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1031,12 +1031,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t ETaskStatus status = streamTaskGetStatus(pTask)->state; int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SINK) { - ASSERT(status != TASK_STATUS__UNINIT); - streamMetaReleaseTask(pMeta, pTask); - return 0; - } - if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { @@ -1058,9 +1052,9 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else { streamTrySchedExec(pTask); } - } else { + } /*else { ASSERT(status != TASK_STATUS__UNINIT); - } + }*/ streamMetaReleaseTask(pMeta, pTask); return 0; @@ -1070,16 +1064,32 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); + if (pTask == NULL) { + tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + taosThreadMutexLock(&pTask->lock); + SStreamTaskState* pState = streamTaskGetStatus(pTask); + tqDebug("s-task:%s start to resume from paused, current status:%s", pTask->id.idStr, pState->name); + taosThreadMutexUnlock(&pTask->lock); + + int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); if (code != 0) { return code; } STaskId* pHTaskId = &pTask->hTaskInfo.id; - SStreamTask* pHistoryTask = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); - if (pHistoryTask) { - code = tqProcessTaskResumeImpl(handle, pHistoryTask, sversion, pReq->igUntreated, fromVnode); + SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask) { + taosThreadMutexLock(&pHTask->lock); + SStreamTaskState* p = streamTaskGetStatus(pHTask); + tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p->name); + taosThreadMutexUnlock(&pHTask->lock); + + code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); } return code; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 5e991593e6..85d3e0068a 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -206,15 +206,15 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i); if (pInfo->event == event) { taosArrayRemove(pSM->pWaitingEventList, i); - stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d", - pTask->id.idStr, pInfo->event); + stDebug("s-task:%s %s event in waiting list not be handled yet, remove it from waiting list, remaining events:%d", + pTask->id.idStr, GET_EVT_NAME(pInfo->event), num - 1); removed = true; break; } } if (!removed) { - stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name); + stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, GET_EVT_NAME(event)); } return TSDB_CODE_SUCCESS;