diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 74361ec319..0a3173b3cb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1224,6 +1224,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); } else { + if (streamTaskShouldPause(&pTask->status)) { + atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE); + } tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); } @@ -1298,22 +1301,22 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version - if (pReq->igUntreated) { // discard all the data when the stream task is suspended. + if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended. pTask->chkInfo.currentVer = sversion; walReaderSeekVer(pTask->exec.pWalReader, sversion); - tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId, + pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } else { // from the previous paused version and go on - tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId, + pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { tqStartStreamTasks(pTq); } else { streamSchedExec(pTask); } + streamMetaReleaseTask(pTq->pStreamMeta, pTask); } return 0;