Merge pull request #21442 from taosdata/fix/TD-24325
set agg pause status
This commit is contained in:
commit
ab715ffabd
|
@ -1224,6 +1224,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
pTask->id.idStr, pTask->chkInfo.version);
|
||||
streamProcessRunReq(pTask);
|
||||
} else {
|
||||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
|
||||
}
|
||||
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
|
||||
}
|
||||
|
||||
|
@ -1298,22 +1301,22 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
|
||||
|
||||
// no lock needs to secure the access of the version
|
||||
if (pReq->igUntreated) { // discard all the data when the stream task is suspended.
|
||||
if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended.
|
||||
pTask->chkInfo.currentVer = sversion;
|
||||
walReaderSeekVer(pTask->exec.pWalReader, sversion);
|
||||
tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
|
||||
pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
|
||||
tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
|
||||
pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
|
||||
} else { // from the previous paused version and go on
|
||||
tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
|
||||
pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
|
||||
tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
|
||||
pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
|
||||
tqStartStreamTasks(pTq);
|
||||
} else {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue