From 2806fe1c563b13a554147b9c6b21f219a1892fde Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Aug 2023 10:58:24 +0800 Subject: [PATCH] fix(stream): ignore the related stream task destory msg in transfer state. --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamDispatch.c | 6 ++--- source/libs/stream/src/streamExec.c | 32 ++++++++++++++----------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9dfde0fed7..4b666ec54a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1527,7 +1527,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; + return TSDB_CODE_SUCCESS; } else { tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId); return TSDB_CODE_INVALID_MSG; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 94e005b790..06861454d1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -758,10 +758,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ASSERT(pTask->info.fillHistory == 1); code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - return code; + if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens +// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } + return TSDB_CODE_SUCCESS; } pTask->msgInfo.retryCount = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3b954793de..a3ff752bc5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -292,9 +292,20 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo: destroy the fill-history task here - qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, - pTask->streamTaskId.taskId); + qError( + "s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed, destroy the related " + "fill-history task", + pTask->id.idStr, pTask->streamTaskId.taskId); + + // 1. free it and remove fill-history task from disk meta-store + streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); + + // 2. save to disk + taosWLockLatch(&pMeta->lock); + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + taosWUnLockLatch(&pMeta->lock); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, @@ -334,9 +345,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); } - // todo check the output queue for fill-history task, and wait for it complete - - // 1. expand the query time window for stream task of WAL scanner pTimeWindow->skey = INT64_MIN; qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); @@ -390,15 +398,10 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SOURCE) { streamTaskFillHistoryFinished(pTask); + } + + if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return code; - } - } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. - code = streamDoTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return code; - } } return code; @@ -522,6 +525,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); } + // agg task should dispatch trans-state msg to sink task, to flush all data to sink task. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { pBlock->srcVgId = pTask->pMeta->vgId; code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);