From 0ae6cd7a8915663f6ce5ac4fa6c207a9f853f836 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 26 Jun 2023 17:27:13 +0800 Subject: [PATCH 1/3] fix:memory leak --- source/dnode/mnode/impl/src/mndConsumer.c | 6 +++--- source/dnode/mnode/impl/src/mndTopic.c | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 3eb33bf4a9..ce6bbf999d 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -455,7 +455,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { mError("consumer:0x%" PRIx64 " group:%s not consistent with data in sdb, saved cgroup:%s", consumerId, req.cgroup, pConsumer->cgroup); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; - return -1; + goto FAIL; } atomic_store_32(&pConsumer->hbStatus, 0); @@ -480,7 +480,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { if (status != MQ_CONSUMER_STATUS_READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; - return -1; + goto FAIL; } int32_t serverEpoch = atomic_load_32(&pConsumer->epoch); @@ -562,7 +562,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { void *buf = rpcMallocCont(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + goto FAIL; } SMqRspHead* pHead = buf; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 01241c9339..c9afbaf524 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -699,6 +699,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){ mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + mndReleaseConsumer(pMnode, pConsumer); continue; } From 4b95ea9cad39e72f5eae6c38dd082b25cfbf143a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 27 Jun 2023 17:11:20 +0800 Subject: [PATCH 2/3] fix:clear consumer if normal close --- source/dnode/mnode/impl/src/mndConsumer.c | 30 ++++++---------------- source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index ce6bbf999d..a7c2e1e2c6 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -220,10 +220,10 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); - if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { - mndReleaseConsumer(pMnode, pConsumer); - return -1; - } +// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST) { +// mndReleaseConsumer(pMnode, pConsumer); +// return -1; +// } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); // pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; @@ -316,22 +316,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { hbStatus); if (status == MQ_CONSUMER_STATUS_READY) { - if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { -// SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); -// if (pLostMsg == NULL) { -// mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d", -// pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg)); -// continue; -// } -// -// pLostMsg->consumerId = pConsumer->consumerId; -// SRpcMsg rpcMsg = { -// .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)}; -// -// mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId, -// MND_CONSUMER_LOST_HB_CNT); -// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - + if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); + } else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { taosRLockLatch(&pConsumer->lock); int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics); for (int32_t i = 0; i < topicNum; i++) { @@ -344,8 +331,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { taosRUnLockLatch(&pConsumer->lock); } } else if (status == MQ_CONSUMER_STATUS_LOST) { - // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. - if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) { // clear consumer if lost a day or unsubscribe/close + if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day mndDropConsumerFromSdb(pMnode, pConsumer->consumerId); } } else { // MQ_CONSUMER_STATUS_REBALANCE diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 4c6a60d8f7..ce3a4ea048 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -175,7 +175,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); if (pVgObj == NULL) { taosMemoryFree(buf); - terrno = TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; return -1; } From 7e38ef7091b67eab109ef653f4414e1e33b06c76 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 27 Jun 2023 17:44:51 +0800 Subject: [PATCH 3/3] fix:add limit for topic and group num --- include/common/tglobal.h | 2 ++ include/util/taoserror.h | 2 ++ source/common/src/tglobal.c | 5 +++++ source/dnode/mnode/impl/src/mndConsumer.c | 12 +++++++++++- source/dnode/mnode/impl/src/mndTopic.c | 5 +++++ source/util/src/terror.c | 4 +++- 6 files changed, 28 insertions(+), 2 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index b08916f891..5dff313a9d 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -162,6 +162,8 @@ extern char tsSmlTagName[]; // extern bool tsSmlDataFormat; // extern int32_t tsSmlBatchSize; +extern int32_t tmqMaxTopicNum; + // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b1c8ada6a4..6147ad6820 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -768,6 +768,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002) #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) +#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004) +#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index dd64538f3d..0f20d6fd2c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table // bool tsSmlDataFormat = false; // int32_t tsSmlBatchSize = 10000; +// tmq +int32_t tmqMaxTopicNum = 20; // query int32_t tsQueryPolicy = 1; int32_t tsQueryRspPolicy = 0; @@ -777,6 +779,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); // tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; + tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; // tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32; @@ -1196,6 +1199,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype); } else if (strcasecmp("smlChildTableName", name) == 0) { tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN); + } else if (strcasecmp("tmqMaxTopicNum", name) == 0) { + tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; } else if (strcasecmp("smlTagName", name) == 0) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); // } else if (strcasecmp("smlDataFormat", name) == 0) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index a7c2e1e2c6..1aed0ba366 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -26,6 +26,7 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 +#define MND_MAX_GROUP_PER_TOPIC 100 #define MND_CONSUMER_LOST_HB_CNT 6 #define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 @@ -635,6 +636,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { char *cgroup = subscribe.cgroup; SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pConsumerNew = NULL; + STrans *pTrans = NULL; int32_t code = -1; SArray *pTopicList = subscribe.topicNames; @@ -642,9 +644,17 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem); int32_t newTopicNum = taosArrayGetSize(pTopicList); + for(int i = 0; i < newTopicNum; i++){ + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i)); + if(pSub != NULL && taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){ + terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE; + code = terrno; + goto _over; + } + } // check topic existence - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index c9afbaf524..551b08d2be 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { SMqTopicObj *pTopic = NULL; SDbObj *pDb = NULL; SCMCreateTopicReq createTopicReq = {0}; + if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){ + terrno = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE; + mError("topic num out of range"); + return code; + } if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e4a00c1fc9..e6ffec85ec 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") -TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")