From 1850f3fb5227f62a34591b7e2c91efeab08be5c1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 11 Sep 2024 11:51:02 +0800 Subject: [PATCH] fix:[TD-31962]memory leak by crash_gen --- include/util/taoserror.h | 1 + source/client/src/clientEnv.c | 2 +- source/client/src/clientTmq.c | 5 ++++- source/common/src/tmsg.c | 4 ++++ source/dnode/mnode/impl/src/mndConsumer.c | 11 ++++++++++- source/libs/executor/src/scanoperator.c | 12 ++++-------- source/util/src/terror.c | 1 + 7 files changed, 25 insertions(+), 11 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4591c7fbcc..16027730f7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -962,6 +962,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013) #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) +#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fec1060042..101bd9341c 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -941,7 +941,7 @@ void taos_init_imp(void) { tscInitRes = TSDB_CODE_OUT_OF_MEMORY; return; } - taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); +// taosHashSetFreeFp(appInfo.pInstMap, destroyAppInst); avoid heap use after free deltaToUtcInitOnce(); char logDirName[64] = {0}; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 783815d97f..0395869367 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2896,7 +2896,10 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); taosFreeQitem(pWrapper); } else { - (void)taosWriteQitem(tmq->mqueue, pWrapper); + if (taosWriteQitem(tmq->mqueue, pWrapper) != 0){ + tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); + taosFreeQitem(pWrapper); + } } } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 758a4aeec3..4aed628a92 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10346,7 +10346,11 @@ int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) { } tEndDecode(pDecoder); + return code; + _exit: + tFreeSTableMetaRsp(pRsp->pMeta); + taosMemoryFreeClear(pRsp->pMeta); return code; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 606b93035f..403a5b89a8 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -511,6 +511,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb } } } + // no topics need to be rebalanced + if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) { + code = TSDB_CODE_TMQ_NO_NEED_REBALANCE; + } + END: return code; } @@ -581,6 +586,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if(ubSubscribe){ SMqConsumerObj *pConsumerTmp = NULL; MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); + if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){ + mndReleaseConsumer(pMnode, pConsumerTmp); + goto END; + } mndReleaseConsumer(pMnode, pConsumerTmp); } MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); @@ -599,7 +608,7 @@ END: mndTransDrop(pTrans); tDeleteSMqConsumerObj(pConsumerNew); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); - return code; + return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code; } SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 42e7e4ac3b..717b8793f2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -668,8 +668,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int val = *pVal; } else { pCache->cacheHit += 1; - STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); - val = *pVal; + STableCachedVal* pValTmp = taosLRUCacheValue(pCache->pTableMetaEntryCache, h); + val = *pValTmp; bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false); qTrace("release LRU cache, res %d", bRes); @@ -721,12 +721,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { taosMemoryFree(data); } - if (code) { - if (freeReader) { - pHandle->api.metaReaderFn.clearReader(&mr); - } - return code; - } + QUERY_CHECK_CODE(code, lino, _end); } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { code = colDataSetVal(pColInfoData, i, data, false); @@ -746,6 +741,7 @@ _end: sizeof(STableCachedVal), freeCachedMetaItem, NULL, TAOS_LRU_PRIORITY_LOW, NULL); if (insertRet != TAOS_LRU_STATUS_OK) { qWarn("failed to put meta into lru cache, code:%d, %s", insertRet, idStr); + freeTableCachedVal(pVal); } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 58dde5cd23..f85c76f157 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -806,6 +806,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")