fix(stream): fix invalid decode hbmsg and memory leaks caused by invalid hb msg.

This commit is contained in:
Haojun Liao 2023-12-01 14:45:18 +08:00
parent 33ad05e2c8
commit d7a19a9179
5 changed files with 20 additions and 9 deletions

View File

@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE TRUE)
set(CMAKE_VERBOSE_MAKEFILE FALSE)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory

View File

@ -682,6 +682,7 @@ typedef struct SStreamHbMsg {
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void streamMetaClearHbMsg(SStreamHbMsg* pMsg);
typedef struct {
int64_t streamId;

View File

@ -2881,6 +2881,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
streamMetaClearHbMsg(&req);
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
@ -2990,9 +2991,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req);
taosArrayDestroy(req.pTaskStatus);
taosArrayDestroy(req.pUpdateNodes);
return TSDB_CODE_SUCCESS;
}

View File

@ -784,7 +784,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
if (tEncodeI32(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI64(pEncoder, ps->stage) < 0) return -1;
if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1;
if (tEncodeDouble(pEncoder, ps->inputRate) < 0) return -1;
@ -861,10 +861,18 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
return false;
}
static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
taosArrayDestroy(pMsg->pTaskStatus);
taosArrayDestroy(pMsg->pUpdateNodes);
taosArrayDestroy(pIdList);
void streamMetaClearHbMsg(SStreamHbMsg* pMsg) {
if (pMsg == NULL) {
return;
}
if (pMsg->pUpdateNodes != NULL) {
taosArrayDestroy(pMsg->pUpdateNodes);
}
if (pMsg->pTaskStatus != NULL) {
taosArrayDestroy(pMsg->pTaskStatus);
}
}
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
@ -1041,7 +1049,8 @@ void metaHbToMnode(void* param, void* tmrId) {
}
_end:
clearHbMsg(&hbMsg, pIdList);
streamMetaClearHbMsg(&hbMsg);
taosArrayDestroy(pIdList);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}

View File

@ -80,6 +80,7 @@ sql use test2;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create stream streams2 trigger at_once ignore expired 0 ignore update 0 waterMark 200s into streamt2 as select _wstart, count(*) c1 from t1 interval(1s);
sleep 1000
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791212001,2,2,3,1.1);