diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 866bb4dc29..e38a677c8b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -324,6 +324,7 @@ typedef struct SStreamStatus { int8_t keepTaskStatus; bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it int32_t timerActive; // timer is active + int8_t allowedAddInTimer; // allowed to add into timer int32_t inScanHistorySentinel; } SStreamStatus; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c33becf5b9..7e2bd1c5e0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1071,6 +1071,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { tqScanWal(pTq); return 0; } + int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); if(code == 0 && taskId > 0){ tqScanWalAsync(pTq, false); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c39e653596..2201f0744e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -621,6 +621,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { + streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, + taosGetTimestampMs(), false); continue; } @@ -637,7 +639,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, pTask->execInfo.start, true); streamMetaReleaseTask(pMeta, pTask); continue;