From b0c5fa82909e70d262bba5efcdbae3d57b134edc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Aug 2023 14:10:41 +0800 Subject: [PATCH] fix(stream): reset the task status. --- source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/inc/streamInt.h | 1 + source/libs/stream/src/streamDispatch.c | 12 +++++++++++- source/libs/stream/src/streamRecover.c | 17 +++++++++++++++++ 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 96e1385ce2..bade1d6a93 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1888,6 +1888,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { streamTaskUpdateEpsetInfo(pTask, req.pNodeList); { taosWLockLatch(&pMeta->lock); + streamSetStatusNormal(pTask); streamMetaSaveTask(pMeta, pTask); taosWUnLockLatch(&pMeta->lock); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c3d340c6f0..43a7232213 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -71,6 +71,7 @@ int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); +int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 98fba39fab..2e4ddf703c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -848,7 +848,7 @@ int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistory return 0; } -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { +int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen) { int32_t len = 0; int32_t code = 0; SEncoder encoder; @@ -879,6 +879,16 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, tEncodeCompleteHistoryDataMsg(&encoder, &msg); tEncoderClear(&encoder); + *pBuffer = pBuf; + *pLen = len; + return 0; +} + +int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { + void* pBuf = NULL; + int32_t len = 0; + + streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet}; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 88d57b5992..c84260169f 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -423,6 +423,23 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory int32_t taskLevel = pTask->info.taskLevel; ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); + if (pTask->status.taskStatus != TASK_STATUS__SCAN_HISTORY) { + qError("s-task:%s not in scan-history status, return upstream:0x%x scan-history finish directly", pTask->id.idStr, + pReq->upstreamTaskId); + + void* pBuf = NULL; + int32_t len = 0; + SRpcMsg msg = {0}; + + streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); + initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); + + tmsgSendRsp(&msg); + qDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data from WAL", pTask->id.idStr, + pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); + return 0; + } + // sink tasks do not send end of scan history msg to its upstream, which is agg task. streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);