subscribe key used stack mem
This commit is contained in:
parent
dfb7ed0658
commit
4d9cc5997a
|
@ -746,7 +746,7 @@ typedef struct {
|
|||
int32_t fsyncPeriod;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
int8_t hashMethod;
|
||||
int8_t hashMethod;
|
||||
int8_t walLevel;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
|
@ -757,7 +757,7 @@ typedef struct {
|
|||
int8_t selfIndex;
|
||||
int8_t streamMode;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
|
||||
|
||||
} SCreateVnodeReq, SAlterVnodeReq;
|
||||
|
||||
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
|
||||
|
@ -1388,7 +1388,7 @@ typedef struct {
|
|||
typedef struct SMqCMGetSubEpReq {
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
} SMqCMGetSubEpReq;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
|
||||
|
@ -1688,7 +1688,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
|
@ -1751,7 +1751,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
} SMqSetCVgRsp;
|
||||
|
||||
typedef struct {
|
||||
|
@ -1759,14 +1759,14 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
int64_t consumerId;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
} SMqMVRebRsp;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int64_t offset;
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
} SMqOffset;
|
||||
|
||||
typedef struct {
|
||||
|
@ -2080,7 +2080,7 @@ typedef struct {
|
|||
int64_t consumerId;
|
||||
int64_t blockingTime;
|
||||
int32_t epoch;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
|
||||
int64_t currentOffset;
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
|
@ -2099,7 +2099,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
SArray* topics; // SArray<SMqSubTopicEp>
|
||||
} SMqCMGetSubEpRsp;
|
||||
|
||||
|
|
|
@ -195,6 +195,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_NODE_NAME_LEN 64
|
||||
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_TOPIC_NAME_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_CGROUP_LEN 193 // it is a null-terminated string
|
||||
#define TSDB_DB_NAME_LEN 65
|
||||
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
|
||||
|
@ -210,9 +211,8 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||
#define TSDB_CONSUMER_GROUP_LEN 192
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_PARTITION_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2)
|
||||
#define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20)
|
||||
#define TSDB_COL_NAME_LEN 65
|
||||
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
|
||||
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
|
||||
|
@ -428,6 +428,8 @@ typedef struct {
|
|||
int32_t primary;
|
||||
} SDiskCfg;
|
||||
|
||||
#define TMQ_SEPARATOR ':'
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -46,7 +46,7 @@ struct tmq_topic_vgroup_list_t {
|
|||
|
||||
struct tmq_conf_t {
|
||||
char clientId[256];
|
||||
char groupId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
int8_t auto_commit;
|
||||
int8_t resetOffset;
|
||||
tmq_commit_cb* commit_cb;
|
||||
|
@ -56,7 +56,7 @@ struct tmq_conf_t {
|
|||
|
||||
struct tmq_t {
|
||||
// conf
|
||||
char groupId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
char clientId[256];
|
||||
int8_t autoCommit;
|
||||
int8_t inWaiting;
|
||||
|
|
|
@ -594,7 +594,7 @@ typedef struct {
|
|||
int64_t consumerId;
|
||||
int64_t connId;
|
||||
SRWLatch lock;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
SArray* currentTopics; // SArray<char*>
|
||||
SArray* recentRemovedTopics; // SArray<char*>
|
||||
int32_t epoch;
|
||||
|
|
|
@ -43,7 +43,12 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
return -1;
|
||||
}
|
||||
|
||||
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
|
||||
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
|
||||
|
||||
int32_t opNum = taosArrayGetSize(inner);
|
||||
if (opNum != 1) {
|
||||
return -1;
|
||||
}
|
||||
SSubplan* plan = taosArrayGetP(inner, 0);
|
||||
|
||||
void* pIter = NULL;
|
||||
|
|
|
@ -40,7 +40,7 @@ enum {
|
|||
MQ_SUBSCRIBE_STATUS__DELETED,
|
||||
};
|
||||
|
||||
static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName);
|
||||
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
||||
|
@ -88,15 +88,9 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
char *key = mndMakeSubscribeKey(cgroup, pTopic->name);
|
||||
if (key == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tDeleteSMqSubscribeObj(pSub);
|
||||
free(pSub);
|
||||
return NULL;
|
||||
}
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
mndMakeSubscribeKey(key, cgroup, pTopic->name);
|
||||
strcpy(pSub->key, key);
|
||||
free(key);
|
||||
|
||||
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
|
||||
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
||||
|
@ -335,15 +329,14 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
|
||||
static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) {
|
||||
int32_t i = 0;
|
||||
while (key[i] != ':') {
|
||||
while (key[i] != TMQ_SEPARATOR) {
|
||||
i++;
|
||||
}
|
||||
key[i] = 0;
|
||||
*cgroup = strdup(key);
|
||||
key[i] = ':';
|
||||
*topic = strdup(&key[i + 1]);
|
||||
memcpy(topic, key, i - 1);
|
||||
topic[i] = 0;
|
||||
strcpy(cgroup, &key[i + 1]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -379,8 +372,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
// get all topics of that topic
|
||||
int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
|
||||
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
|
||||
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||
taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
|
@ -396,8 +390,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
int32_t sz = taosArrayGetSize(rebSubs);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
char *topic = taosArrayGetP(rebSubs, i);
|
||||
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
|
||||
char *topic = taosArrayGetP(rebSubs, i);
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
|
||||
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||
if (status == MQ_CONSUMER_STATUS__INIT) {
|
||||
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
|
||||
|
@ -530,9 +525,9 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
|||
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
||||
|
||||
if (pConsumerEp->oldConsumerId == -1) {
|
||||
char *topic;
|
||||
char *cgroup;
|
||||
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
mndSplitSubscribeKey(pSub->key, topic, cgroup);
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
||||
|
||||
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %ld", pConsumerEp->vgId, topic,
|
||||
|
@ -540,8 +535,6 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
free(topic);
|
||||
free(cgroup);
|
||||
} else {
|
||||
mInfo("mq rebalance: assign vgroup %d, from consumer %ld to consumer %ld", pConsumerEp->vgId,
|
||||
pConsumerEp->oldConsumerId, pConsumerEp->consumerId);
|
||||
|
@ -769,6 +762,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
@ -814,6 +808,7 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
|
|||
/*qDestroyQueryDag(pDag);*/
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
||||
const SMqConsumerEp *pConsumerEp) {
|
||||
|
@ -959,23 +954,19 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
|
|||
return 0;
|
||||
}
|
||||
|
||||
static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
|
||||
char *key = malloc(TSDB_SHOW_SUBQUERY_LEN);
|
||||
if (key == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
static int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName) {
|
||||
int32_t tlen = strlen(cgroup);
|
||||
memcpy(key, cgroup, tlen);
|
||||
key[tlen] = ':';
|
||||
key[tlen] = TMQ_SEPARATOR;
|
||||
strcpy(key + tlen + 1, topicName);
|
||||
return key;
|
||||
return 0;
|
||||
}
|
||||
|
||||
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
char *key = mndMakeSubscribeKey(cgroup, topicName);
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
mndMakeSubscribeKey(key, cgroup, topicName);
|
||||
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
||||
free(key);
|
||||
if (pSub == NULL) {
|
||||
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
|
||||
}
|
||||
|
|
|
@ -267,7 +267,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill")
|
||||
|
||||
// mnode-topic
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with aggregation is unsupported")
|
||||
|
||||
// dnode
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
|
||||
|
|
Loading…
Reference in New Issue