fix:add limit for topic and group num
This commit is contained in:
parent
4b95ea9cad
commit
7e38ef7091
|
@ -162,6 +162,8 @@ extern char tsSmlTagName[];
|
||||||
// extern bool tsSmlDataFormat;
|
// extern bool tsSmlDataFormat;
|
||||||
// extern int32_t tsSmlBatchSize;
|
// extern int32_t tsSmlBatchSize;
|
||||||
|
|
||||||
|
extern int32_t tmqMaxTopicNum;
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
extern int64_t tsWalFsyncDataSizeLimit;
|
extern int64_t tsWalFsyncDataSizeLimit;
|
||||||
|
|
||||||
|
|
|
@ -768,6 +768,8 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
|
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
|
||||||
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
|
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
|
||||||
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
|
#define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003)
|
||||||
|
#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004)
|
||||||
|
#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005)
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
|
|
|
@ -103,6 +103,8 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
|
||||||
// bool tsSmlDataFormat = false;
|
// bool tsSmlDataFormat = false;
|
||||||
// int32_t tsSmlBatchSize = 10000;
|
// int32_t tsSmlBatchSize = 10000;
|
||||||
|
|
||||||
|
// tmq
|
||||||
|
int32_t tmqMaxTopicNum = 20;
|
||||||
// query
|
// query
|
||||||
int32_t tsQueryPolicy = 1;
|
int32_t tsQueryPolicy = 1;
|
||||||
int32_t tsQueryRspPolicy = 0;
|
int32_t tsQueryRspPolicy = 0;
|
||||||
|
@ -777,6 +779,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||||
|
|
||||||
|
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
|
||||||
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
|
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
|
||||||
tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32;
|
tsMaxInsertBatchRows = cfgGetItem(pCfg, "maxInsertBatchRows")->i32;
|
||||||
|
|
||||||
|
@ -1196,6 +1199,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
|
||||||
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
|
cfgSetItem(pCfg, "secondEp", tsSecond, pSecondpItem->stype);
|
||||||
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
} else if (strcasecmp("smlChildTableName", name) == 0) {
|
||||||
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
||||||
|
} else if (strcasecmp("tmqMaxTopicNum", name) == 0) {
|
||||||
|
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
|
||||||
} else if (strcasecmp("smlTagName", name) == 0) {
|
} else if (strcasecmp("smlTagName", name) == 0) {
|
||||||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
// } else if (strcasecmp("smlDataFormat", name) == 0) {
|
// } else if (strcasecmp("smlDataFormat", name) == 0) {
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#define MND_CONSUMER_VER_NUMBER 1
|
#define MND_CONSUMER_VER_NUMBER 1
|
||||||
#define MND_CONSUMER_RESERVE_SIZE 64
|
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||||
|
|
||||||
|
#define MND_MAX_GROUP_PER_TOPIC 100
|
||||||
#define MND_CONSUMER_LOST_HB_CNT 6
|
#define MND_CONSUMER_LOST_HB_CNT 6
|
||||||
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
||||||
|
|
||||||
|
@ -635,6 +636,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
char *cgroup = subscribe.cgroup;
|
char *cgroup = subscribe.cgroup;
|
||||||
SMqConsumerObj *pExistedConsumer = NULL;
|
SMqConsumerObj *pExistedConsumer = NULL;
|
||||||
SMqConsumerObj *pConsumerNew = NULL;
|
SMqConsumerObj *pConsumerNew = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SArray *pTopicList = subscribe.topicNames;
|
SArray *pTopicList = subscribe.topicNames;
|
||||||
|
@ -642,9 +644,17 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
|
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
|
||||||
|
|
||||||
int32_t newTopicNum = taosArrayGetSize(pTopicList);
|
int32_t newTopicNum = taosArrayGetSize(pTopicList);
|
||||||
|
for(int i = 0; i < newTopicNum; i++){
|
||||||
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, (const char*)cgroup, (const char*)taosArrayGetP(pTopicList, i));
|
||||||
|
if(pSub != NULL && taosHashGetSize(pSub->consumerHash) > MND_MAX_GROUP_PER_TOPIC){
|
||||||
|
terrno = TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
|
||||||
|
code = terrno;
|
||||||
|
goto _over;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// check topic existence
|
// check topic existence
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
|
@ -585,6 +585,11 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SCMCreateTopicReq createTopicReq = {0};
|
SCMCreateTopicReq createTopicReq = {0};
|
||||||
|
if (sdbGetSize(pMnode->pSdb, SDB_TOPIC) >= tmqMaxTopicNum){
|
||||||
|
terrno = TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE;
|
||||||
|
mError("topic num out of range");
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
|
if (tDeserializeSCMCreateTopicReq(pReq->pCont, pReq->contLen, &createTopicReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
|
|
@ -629,7 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
|
|
Loading…
Reference in New Issue