From dba58f5e38e8c09f0aa66e5b274541eac7088d60 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Dec 2023 11:54:13 +0800 Subject: [PATCH] fix(stream): fix syntax error. --- source/dnode/mnode/impl/src/mndStream.c | 3 +-- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamStart.c | 6 ++++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 965e15c08c..f67a38b765 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -148,9 +148,8 @@ void mndCleanupStream(SMnode *pMnode) { taosArrayDestroy(execInfo.pTaskList); taosHashCleanup(execInfo.pTaskMap); taosHashCleanup(execInfo.transMgmt.pDBTrans); - taosThreadMutexDestroy(&execInfo.lock); - taosHashCleanup(execInfo.transMgmt.pDBTrans); taosHashCleanup(execInfo.transMgmt.pWaitingList); + taosThreadMutexDestroy(&execInfo.lock); mDebug("mnd stream exec info cleanup"); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ae8c92d48e..c257cdc7b3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -822,7 +822,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &taskId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.inputRate) < 0) return -1; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 2f83a0d6fa..d97355b5a6 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -468,14 +468,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); - streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + taosGetTimestampMs(), false); // automatically set the related fill-history task to be failed. if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId); - streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false); + streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, + taosGetTimestampMs(), false); streamMetaReleaseTask(pTask->pMeta, pHTask); } } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms