From 66697ee188e108836ee5257be8038d29b408ed3b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 18:22:45 +0800 Subject: [PATCH 1/2] fix(stream): check status before scan history. --- source/dnode/vnode/src/tq/tq.c | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1c0f73249c..7a07904ab6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -964,7 +964,27 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // do recovery step1 const char* id = pTask->id.idStr; - char* pStatus = streamTaskGetStatus(pTask).name; + streamMutexLock(&pTask->lock); + + SStreamTaskState s = streamTaskGetStatus(pTask); + if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 1)) { + tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not ready for scan-history data, quit", id, pMeta->vgId, + s.name, pTask->status.downstreamReady); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s vgId:%d executor is null, not executor scan history", id, pMeta->vgId); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + streamMutexUnlock(&pTask->lock); // avoid multi-thread exec while (1) { From bea8fe42db3f236b3d3ac8ecbe71c0fbc784daab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 18:28:19 +0800 Subject: [PATCH 2/2] fix(stream): check status before scan history. --- source/dnode/vnode/src/tq/tq.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7a07904ab6..119edb47bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -954,6 +954,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t code = TSDB_CODE_SUCCESS; SStreamTask* pTask = NULL; SStreamTask* pStreamTask = NULL; + char* pStatus = NULL; code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if (pTask == NULL) { @@ -967,9 +968,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamMutexLock(&pTask->lock); SStreamTaskState s = streamTaskGetStatus(pTask); - if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 1)) { - tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not ready for scan-history data, quit", id, pMeta->vgId, - s.name, pTask->status.downstreamReady); + pStatus = s.name; + + if ((s.state != TASK_STATUS__SCAN_HISTORY) || (pTask->status.downstreamReady == 0)) { + tqError("s-task:%s vgId:%d status:%s downstreamReady:%d not allowed/ready for scan-history data, quit", id, + pMeta->vgId, s.name, pTask->status.downstreamReady); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask);