diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7d16924ef..c6ab61529a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -279,6 +279,7 @@ typedef struct SCheckpointInfo { typedef struct SStreamStatus { int8_t taskStatus; int8_t schedStatus; + int8_t keepTaskStatus; } SStreamStatus; struct SStreamTask { @@ -539,6 +540,7 @@ int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); +bool streamTaskShouldPause(const SStreamStatus* pStatus); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fd01e3ffae..b6602588da 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1221,6 +1221,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); streamMetaReleaseTask(pTq->pStreamMeta, pTask); } @@ -1231,7 +1232,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { - streamSetStatusNormal(pTask); + atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version if (pReq->igUntreated) { // discard all the data when the stream task is suspended. diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 0eb9a92f87..92c37a8b5e 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -106,7 +106,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || - status == TASK_STATUS__WAIT_DOWNSTREAM || status == TASK_STATUS__PAUSE) { + status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 22c8a61a7b..cbb4b33cf3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -52,7 +52,7 @@ void streamCleanUp() { void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { streamMetaReleaseTask(NULL, pTask); return; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7ea093973a..1122cd8ff0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -22,6 +22,11 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus) { return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } +bool streamTaskShouldPause(const SStreamStatus* pStatus) { + int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus); + return (status == TASK_STATUS__PAUSE); +} + static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -141,7 +146,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { - if (streamTaskShouldStop(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { taosArrayDestroy(pRes); return 0; }