From 1367552f389441b54ddbd8be2bf3e2a40437050c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Jul 2023 10:04:15 +0800 Subject: [PATCH] fix(stream): refactor the halt status to check more status. --- include/libs/stream/tstream.h | 3 ++ source/dnode/vnode/src/tq/tq.c | 22 +++++----- source/libs/stream/src/streamExec.c | 9 ++-- source/libs/stream/src/streamRecover.c | 59 ++++++++++++++++++++++++-- 4 files changed, 75 insertions(+), 18 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3306c0bb27..9d2c0a4f6a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -610,6 +610,9 @@ int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); void streamTaskPause(SStreamTask* pTask); +void streamTaskResume(SStreamTask* pTask); +void streamTaskHalt(SStreamTask* pTask); +void streamTaskResumeFromHalt(SStreamTask* pTask); void streamTaskDisablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ae14eb9db5..9002001404 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1156,12 +1156,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(100); } - // todo fix the race condition, when pause msg is received from mnode, add lock here - // now we can stop the stream task execution - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); - pStreamTask->status.taskStatus = TASK_STATUS__HALT; + streamTaskHalt(pTask); - // todo disable the pause + // now we can stop the stream task execution + // todo upgrade the statu to be HALT from PAUSE or NORMAL + pStreamTask->status.taskStatus = TASK_STATUS__HALT; tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, id); @@ -1522,11 +1521,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, return -1; } - if (streamTaskShouldPause(&pTask->status)) { - atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); + // todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal + streamTaskResume(pTask); + int32_t level = pTask->info.taskLevel; + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { // no lock needs to secure the access of the version - if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { + if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { // discard all the data when the stream task is suspended. walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 @@ -1537,9 +1539,9 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) { streamStartRecoverTask(pTask, igUntreated); - } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { + } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { tqStartStreamTasks(pTq); } else { streamSchedExec(pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c2f676e5b2..7127b1c323 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -345,7 +345,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { double el = (taosGetTimestampMs() - st) / 1000.0; if (el > 0) { - qDebug("s-task:%s wait for stream task:%s for %.2fs to handle all data in inputQ", pTask->id.idStr, + qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", pTask->id.idStr, pStreamTask->id.idStr, el); } } @@ -377,13 +377,13 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } else { ASSERT(status == TASK_STATUS__SCAN_HISTORY); pStreamTask->status.taskStatus = TASK_STATUS__HALT; - qDebug("s-task:%s halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); + qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } // wait for the stream task to handle all in the inputQ, and to be idle waitForTaskIdle(pTask, pStreamTask); - // In case of sink tasks, no need to be halted for them. + // In case of sink tasks, no need to halt them. // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. // When a task is idle with halt status, all data in inputQ are consumed. @@ -405,8 +405,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // reset the status of stream task - streamSetStatusNormal(pStreamTask); + streamTaskResumeFromHalt(pStreamTask); pTask->status.taskStatus = TASK_STATUS__DROPPING; qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3be5f5b1e5..42eb27bb8f 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -774,7 +774,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { pRange->range.minVer = 0; pRange->range.maxVer = ver; - qDebug("s-task:%s level:%d related-fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 + qDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 ", verRang:%" PRId64 " - %" PRId64, pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); @@ -806,6 +806,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { streamTaskDoCheckDownstreamTasks(pTask); } +// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause void streamTaskPause(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; @@ -824,11 +825,17 @@ void streamTaskPause(SStreamTask* pTask) { } while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { - if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { + status = pTask->status.taskStatus; + if (status == TASK_STATUS__DROPPING) { qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); return; } + if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) { + qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str); + return; + } + qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); taosMsleep(100); } @@ -841,6 +848,17 @@ void streamTaskPause(SStreamTask* pTask) { streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); } +void streamTaskResume(SStreamTask* pTask) { + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__PAUSE) { + pTask->status.taskStatus = pTask->status.keepTaskStatus; + pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; + qDebug("s-task:%s resume from pause", pTask->id.idStr); + } else { + qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + } +} + // todo fix race condition void streamTaskDisablePause(SStreamTask* pTask) { // pre-condition check @@ -857,4 +875,39 @@ void streamTaskDisablePause(SStreamTask* pTask) { void streamTaskEnablePause(SStreamTask* pTask) { qDebug("s-task:%s enable task pause", pTask->id.idStr); pTask->status.pauseAllowed = 1; -} \ No newline at end of file +} + +void streamTaskHalt(SStreamTask* pTask) { + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + return; + } + + if (status == TASK_STATUS__HALT) { + return; + } + + // upgrade to halt status + if (status == TASK_STATUS__PAUSE) { + qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), + streamGetTaskStatusStr(TASK_STATUS__PAUSE)); + } else { + qDebug("s-task:%s halt task", pTask->id.idStr); + } + + pTask->status.keepTaskStatus = status; + pTask->status.taskStatus = TASK_STATUS__HALT; +} + +void streamTaskResumeFromHalt(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + int8_t status = pTask->status.taskStatus; + if (status != TASK_STATUS__HALT) { + qError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status)); + return; + } + + pTask->status.taskStatus = pTask->status.keepTaskStatus; + pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; + qDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); +}