From feb09c8361406ff09771fd5ad0593a26c4ce2fae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 26 Sep 2023 01:09:55 +0800 Subject: [PATCH] fix(stream): fix error in pause scan-history data. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 17 ++++++-- source/libs/stream/src/streamRecover.c | 56 +++++++++++++------------- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ce4d7fa4b6..9420c5235d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -698,7 +698,6 @@ int32_t streamSetStatusUnint(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); void streamTaskPause(SStreamTask* pTask, SStreamMeta* pMeta); void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta); -void streamTaskHalt(SStreamTask* pTask); void streamTaskResumeFromHalt(SStreamTask* pTask); void streamTaskDisablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 610db1b72f..7f0842736e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1135,8 +1135,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } if (status == TASK_STATUS__HALT) { - // return; - // do nothing +// tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, +// pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); +// latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); +// +// taosThreadMutexUnlock(&pStreamTask->lock); +// break; } if (pStreamTask->status.taskStatus == TASK_STATUS__CK) { @@ -1152,7 +1156,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { qDebug("s-task:%s upgrade status to %s from %s", pStreamTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), streamGetTaskStatusStr(TASK_STATUS__PAUSE)); } else { - qDebug("s-task:%s halt task", pStreamTask->id.idStr); + qDebug("s-task:%s halt task, prev status:%s", pStreamTask->id.idStr, streamGetTaskStatusStr(status)); } pStreamTask->status.keepTaskStatus = status; @@ -1174,6 +1178,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->execInfo.step2Start = taosGetTimestampMs(); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); streamTaskPutTranstateIntoInputQ(pTask); + + if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; + qDebug("s-task:%s prev status is %s, update the kept status to be:%s when after step 2", id, + streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus)); + } + streamTryExec(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 2a277f3fca..d910c7d785 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -1001,34 +1001,34 @@ void streamTaskEnablePause(SStreamTask* pTask) { } // fix: this function should be removed, it may cause deadlock. -void streamTaskHalt(SStreamTask* pTask) { - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { - return; - } - - if (status == TASK_STATUS__HALT) { - return; - } - - // wait for checkpoint completed - while(pTask->status.taskStatus == TASK_STATUS__CK) { - qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr, - streamGetTaskStatusStr(TASK_STATUS__CK)); - taosMsleep(1000); - } - - // upgrade to halt status - if (status == TASK_STATUS__PAUSE) { - stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), - streamGetTaskStatusStr(TASK_STATUS__PAUSE)); - } else { - stDebug("s-task:%s halt task", pTask->id.idStr); - } - - pTask->status.keepTaskStatus = status; - pTask->status.taskStatus = TASK_STATUS__HALT; -} +//void streamTaskHalt(SStreamTask* pTask) { +// int8_t status = pTask->status.taskStatus; +// if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { +// return; +// } +// +// if (status == TASK_STATUS__HALT) { +// return; +// } +// +// // wait for checkpoint completed +// while(pTask->status.taskStatus == TASK_STATUS__CK) { +// qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr, +// streamGetTaskStatusStr(TASK_STATUS__CK)); +// taosMsleep(1000); +// } +// +// // upgrade to halt status +// if (status == TASK_STATUS__PAUSE) { +// stDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), +// streamGetTaskStatusStr(TASK_STATUS__PAUSE)); +// } else { +// stDebug("s-task:%s halt task", pTask->id.idStr); +// } +// +// pTask->status.keepTaskStatus = status; +// pTask->status.taskStatus = TASK_STATUS__HALT; +//} void streamTaskResumeFromHalt(SStreamTask* pTask) { const char* id = pTask->id.idStr;