From d7a19a91790f4956eacf0042b8194f25a70786ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Dec 2023 14:45:18 +0800 Subject: [PATCH] fix(stream): fix invalid decode hbmsg and memory leaks caused by invalid hb msg. --- cmake/cmake.define | 2 +- include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 4 ++-- source/libs/stream/src/streamMeta.c | 21 +++++++++++++++------ tests/script/tsim/stream/basic4.sim | 1 + 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 75ebbe059e..5c13181099 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE TRUE) +set(CMAKE_VERBOSE_MAKEFILE FALSE) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a1b96cbd99..12f7ffb1cd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -682,6 +682,7 @@ typedef struct SStreamHbMsg { int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); +void streamMetaClearHbMsg(SStreamHbMsg* pMsg); typedef struct { int64_t streamId; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index acedd8beda..0e0fbfab61 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2881,6 +2881,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { + streamMetaClearHbMsg(&req); tDecoderClear(&decoder); terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -2990,9 +2991,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } taosThreadMutexUnlock(&execInfo.lock); + streamMetaClearHbMsg(&req); - taosArrayDestroy(req.pTaskStatus); - taosArrayDestroy(req.pUpdateNodes); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c257cdc7b3..85ef11b841 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -784,7 +784,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; - if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; + if (tEncodeI64(pEncoder, ps->stage) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1; @@ -861,10 +861,18 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { return false; } -static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) { - taosArrayDestroy(pMsg->pTaskStatus); - taosArrayDestroy(pMsg->pUpdateNodes); - taosArrayDestroy(pIdList); +void streamMetaClearHbMsg(SStreamHbMsg* pMsg) { + if (pMsg == NULL) { + return; + } + + if (pMsg->pUpdateNodes != NULL) { + taosArrayDestroy(pMsg->pUpdateNodes); + } + + if (pMsg->pTaskStatus != NULL) { + taosArrayDestroy(pMsg->pTaskStatus); + } } static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { @@ -1041,7 +1049,8 @@ void metaHbToMnode(void* param, void* tmrId) { } _end: - clearHbMsg(&hbMsg, pIdList); + streamMetaClearHbMsg(&hbMsg); + taosArrayDestroy(pIdList); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/tests/script/tsim/stream/basic4.sim b/tests/script/tsim/stream/basic4.sim index b4e3d62545..d2bf321ad5 100644 --- a/tests/script/tsim/stream/basic4.sim +++ b/tests/script/tsim/stream/basic4.sim @@ -80,6 +80,7 @@ sql use test2; sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s); +sleep 1000 sql insert into t1 values(1648791211000,1,2,3,1.0); sql insert into t1 values(1648791212001,2,2,3,1.1);