feat[TS-6115]: support deletion of topics with active consumers. (#30232)
* feat[TS-6115]: drop topic & consumer group in force * feat[TS-6115]: drop topic & consumer group in force * feat[TS-6115]: drop topic & consumer group in force
This commit is contained in:
parent
c460412407
commit
3bc6635de3
|
@ -69,10 +69,10 @@ This statement creates a subscription that includes all table data in the databa
|
|||
|
||||
## Delete Topic
|
||||
|
||||
If you no longer need to subscribe to the data, you can delete the topic. Note that only topics that are not currently subscribed can be deleted.
|
||||
If you no longer need to subscribe to the data, you can delete the topic. If the current topic is subscribed to by a consumer, it can be forcibly deleted using the FORCE syntax. After the forced deletion, the subscribed consumer will consume data with errors (FORCE syntax supported from version 3.3.6.0).
|
||||
|
||||
```sql
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
## View Topics
|
||||
|
@ -99,10 +99,10 @@ Displays information about all consumers in the current database, including the
|
|||
|
||||
### Delete Consumer Group
|
||||
|
||||
When creating a consumer, a consumer group is assigned to the consumer. Consumers cannot be explicitly deleted, but the consumer group can be deleted with the following statement when there are no consumers in the group:
|
||||
When creating a consumer, a consumer group is assigned to the consumer. Consumers cannot be explicitly deleted, but the consumer group can be deleted. If there are consumers in the current consumer group who are consuming, the FORCE syntax can be used to force deletion. After forced deletion, subscribed consumers will consume data with errors (FORCE syntax supported from version 3.3.6.0).
|
||||
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
## Data Subscription
|
||||
|
@ -137,6 +137,7 @@ If the following 3 data entries were written, then during replay, the first entr
|
|||
|
||||
When using the data subscription's replay feature, note the following:
|
||||
|
||||
- Enable replay function by configuring the consumption parameter enable.replay to true
|
||||
- The replay function of data subscription only supports data playback for query subscriptions; supertable and database subscriptions do not support playback.
|
||||
- Replay does not support progress saving.
|
||||
- Because data playback itself requires processing time, there is a precision error of several tens of milliseconds in playback.
|
||||
|
|
|
@ -58,11 +58,11 @@ Note: Subscriptions to supertables and databases are advanced subscription modes
|
|||
|
||||
## Delete topic
|
||||
|
||||
If you no longer need to subscribe to data, you can delete the topic, but note: only TOPICS that are not currently being subscribed to can be deleted.
|
||||
If you no longer need to subscribe to the data, you can delete the topic. If the current topic is subscribed to by a consumer, it can be forcibly deleted using the FORCE syntax. After the forced deletion, the subscribed consumer will consume data with errors (FORCE syntax supported from version 3.3.6.0)
|
||||
|
||||
```sql
|
||||
/* Delete topic */
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
At this point, if there are consumers on this subscription topic, they will receive an error.
|
||||
|
@ -81,8 +81,10 @@ Consumer groups can only be created through the TDengine client driver or APIs p
|
|||
|
||||
## Delete consumer group
|
||||
|
||||
When creating a consumer, a consumer group is assigned to the consumer. Consumers cannot be explicitly deleted, but the consumer group can be deleted. If there are consumers in the current consumer group who are consuming, the FORCE syntax can be used to force deletion. After forced deletion, subscribed consumers will consume data with errors (FORCE syntax supported from version 3.3.6.0).
|
||||
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
Deletes the consumer group `cgroup_name` on the topic `topic_name`.
|
||||
|
|
|
@ -64,10 +64,10 @@ CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS DATABASE db_name;
|
|||
|
||||
## 删除主题
|
||||
|
||||
如果不再需要订阅数据,可以删除 topic,需要注意只有当前未在订阅中的 topic 才能被删除。
|
||||
如果不再需要订阅数据,可以删除 topic,如果当前 topic 被消费者订阅,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
|
||||
```sql
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
## 查看主题
|
||||
|
@ -94,9 +94,9 @@ SHOW CONSUMERS;
|
|||
|
||||
### 删除消费组
|
||||
|
||||
消费者创建的时候,会给消费者指定一个消费者组,消费者不能显式的删除,但是消费者组在组内没有消费者时可以通过下面语句删除:
|
||||
消费者创建的时候,会给消费者指定一个消费者组,消费者不能显式的删除,但是可以删除消费者组。如果当前消费者组里有消费者在消费,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
## 数据订阅
|
||||
|
@ -129,6 +129,7 @@ TDengine 的数据订阅功能支持回放(replay)功能,允许用户按
|
|||
```
|
||||
|
||||
使用数据订阅的回放功能时需要注意如下几项:
|
||||
- 通过配置消费参数 enable.replay 为 true 开启回放功能。
|
||||
- 数据订阅的回放功能仅查询订阅支持数据回放,超级表和库订阅不支持回放。
|
||||
- 回放不支持进度保存。
|
||||
- 因为数据回放本身需要处理时间,所以回放的精度存在几十毫秒的误差。
|
||||
|
|
|
@ -59,11 +59,11 @@ CREATE TOPIC [IF NOT EXISTS] topic_name [with meta] AS DATABASE db_name;
|
|||
|
||||
## 删除 topic
|
||||
|
||||
如果不再需要订阅数据,可以删除 topic,需要注意:只有当前未在订阅中的 TOPIC 才能被删除。
|
||||
如果不再需要订阅数据,可以删除 topic,如果当前 topic 被消费者订阅,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
|
||||
```sql
|
||||
/* 删除 topic */
|
||||
DROP TOPIC [IF EXISTS] topic_name;
|
||||
DROP TOPIC [IF EXISTS] [FORCE] topic_name;
|
||||
```
|
||||
|
||||
此时如果该订阅主题上存在 consumer,则此 consumer 会收到一个错误。
|
||||
|
@ -82,8 +82,10 @@ SHOW TOPICS;
|
|||
|
||||
## 删除消费组
|
||||
|
||||
消费者创建的时候,会给消费者指定一个消费者组,消费者不能显式的删除,但是可以删除消费者组。如果当前消费者组里有消费者在消费,通过 FORCE 语法可强制删除,强制删除后订阅的消费者会消费数据会出错(FORCE 语法3.3.6.0版本开始支持)。
|
||||
|
||||
```sql
|
||||
DROP CONSUMER GROUP [IF EXISTS] cgroup_name ON topic_name;
|
||||
DROP CONSUMER GROUP [IF EXISTS] [FORCE] cgroup_name ON topic_name;
|
||||
```
|
||||
|
||||
删除主题 topic_name 上的消费组 cgroup_name。
|
||||
|
|
|
@ -3358,6 +3358,7 @@ typedef struct {
|
|||
int8_t igNotExists;
|
||||
int32_t sqlLen;
|
||||
char* sql;
|
||||
int8_t force;
|
||||
} SMDropTopicReq;
|
||||
|
||||
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||
|
@ -3368,6 +3369,7 @@ typedef struct {
|
|||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
int8_t igNotExists;
|
||||
int8_t force;
|
||||
} SMDropCgroupReq;
|
||||
|
||||
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
|
||||
|
|
|
@ -553,6 +553,7 @@ typedef struct SDropTopicStmt {
|
|||
ENodeType type;
|
||||
char topicName[TSDB_TOPIC_NAME_LEN];
|
||||
bool ignoreNotExists;
|
||||
bool force;
|
||||
} SDropTopicStmt;
|
||||
|
||||
typedef struct SDropCGroupStmt {
|
||||
|
@ -560,6 +561,7 @@ typedef struct SDropCGroupStmt {
|
|||
char topicName[TSDB_TOPIC_NAME_LEN];
|
||||
char cgroup[TSDB_CGROUP_LEN];
|
||||
bool ignoreNotExists;
|
||||
bool force;
|
||||
} SDropCGroupStmt;
|
||||
|
||||
typedef struct SAlterClusterStmt {
|
||||
|
|
|
@ -6676,6 +6676,8 @@ int32_t tSerializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq
|
|||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->name));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
|
||||
ENCODESQL();
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->force));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6699,6 +6701,9 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
|||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->name));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
|
||||
DECODESQL();
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->force));
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6719,6 +6724,7 @@ int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pR
|
|||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->topic));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->cgroup));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->igNotExists));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->force));
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6741,6 +6747,9 @@ int32_t tDeserializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *
|
|||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->topic));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->cgroup));
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->igNotExists));
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->force));
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -29,7 +29,7 @@ int32_t mndGetGroupNumByTopic(SMnode *pMnode, const char *topicName);
|
|||
int32_t mndAcquireSubscribeByKey(SMnode *pMnode, const char *key, SMqSubscribeObj** pSub);
|
||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic, bool force);
|
||||
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
||||
|
||||
bool mndRebTryStart();
|
||||
|
|
|
@ -1112,12 +1112,13 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
|
||||
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic, bool deleteConsumer) {
|
||||
if (pMnode == NULL || pTrans == NULL || cgroup == NULL || topic == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
int code = 0;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
|
@ -1130,18 +1131,27 @@ static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgr
|
|||
continue;
|
||||
}
|
||||
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topic);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topic, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_CGROUP_USED;
|
||||
goto END;
|
||||
if (deleteConsumer) {
|
||||
MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
|
||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
pConsumerNew = NULL;
|
||||
} else {
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topic);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topic, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_CGROUP_USED;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
}
|
||||
|
||||
END:
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
sdbRelease(pMnode->pSdb, pConsumer);
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
return code;
|
||||
|
@ -1173,7 +1183,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
taosWLockLatch(&pSub->lock);
|
||||
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
if (!dropReq.force && taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
code = TSDB_CODE_MND_CGROUP_USED;
|
||||
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code));
|
||||
goto END;
|
||||
|
@ -1185,7 +1195,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
|||
mndTransSetDbName(pTrans, pSub->dbName, NULL);
|
||||
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
|
||||
MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic, dropReq.force));
|
||||
MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
|
||||
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));
|
||||
|
||||
|
@ -1409,7 +1419,7 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName, bool force) {
|
||||
if (pMnode == NULL || pTrans == NULL || topicName == NULL) return TSDB_CODE_INVALID_PARA;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t code = 0;
|
||||
|
@ -1428,7 +1438,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
|||
}
|
||||
|
||||
// iter all vnode to delete handle
|
||||
if (taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
if (!force && taosHashGetSize(pSub->consumerHash) != 0) {
|
||||
code = TSDB_CODE_MND_IN_REBALANCE;
|
||||
goto END;
|
||||
}
|
||||
|
|
|
@ -650,7 +650,7 @@ bool checkTopic(SArray *topics, char *topicName){
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
|
||||
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName, bool deleteConsumer){
|
||||
if (pMnode == NULL || pTrans == NULL || topicName == NULL) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
@ -658,24 +658,34 @@ static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *top
|
|||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SMqConsumerObj *pConsumer = NULL;
|
||||
SMqConsumerObj *pConsumerNew = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
goto END;
|
||||
if (deleteConsumer) {
|
||||
MND_TMQ_RETURN_CHECK(tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup, -1, NULL, NULL, &pConsumerNew));
|
||||
MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumerNew));
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
pConsumerNew = NULL;
|
||||
} else {
|
||||
bool found = checkTopic(pConsumer->assignedTopics, topicName);
|
||||
if (found){
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
|
||||
topicName, pConsumer->consumerId, pConsumer->cgroup);
|
||||
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
}
|
||||
|
||||
END:
|
||||
tDeleteSMqConsumerObj(pConsumerNew);
|
||||
sdbRelease(pSdb, pConsumer);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return code;
|
||||
|
@ -760,8 +770,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
|
||||
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name));
|
||||
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name));
|
||||
MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
|
||||
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name, dropReq.force));
|
||||
|
||||
if (pTopic->ntbUid != 0) {
|
||||
MND_TMQ_RETURN_CHECK(mndDropCheckInfoByTopic(pMnode, pTrans, pTopic));
|
||||
|
|
|
@ -310,8 +310,8 @@ SNode* createCreateTopicStmtUseDb(SAstCreateContext* pCxt, bool ignoreExists, ST
|
|||
int8_t withMeta);
|
||||
SNode* createCreateTopicStmtUseTable(SAstCreateContext* pCxt, bool ignoreExists, SToken* pTopicName, SNode* pRealTable,
|
||||
int8_t withMeta, SNode* pWhere);
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName);
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName);
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName, bool force);
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName, bool force);
|
||||
SNode* createAlterClusterStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
|
||||
SNode* createAlterLocalStmt(SAstCreateContext* pCxt, const SToken* pConfig, const SToken* pValue);
|
||||
SNode* createDefaultExplainOptions(SAstCreateContext* pCxt);
|
||||
|
|
|
@ -778,8 +778,8 @@ cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(D)
|
|||
cmd ::= CREATE TOPIC not_exists_opt(A) topic_name(B) with_meta(E)
|
||||
STABLE full_table_name(C) where_clause_opt(D). { pCxt->pRootNode = createCreateTopicStmtUseTable(pCxt, A, &B, C, E, D); }
|
||||
|
||||
cmd ::= DROP TOPIC exists_opt(A) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B); }
|
||||
cmd ::= DROP CONSUMER GROUP exists_opt(A) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C); }
|
||||
cmd ::= DROP TOPIC exists_opt(A) force_opt(C) topic_name(B). { pCxt->pRootNode = createDropTopicStmt(pCxt, A, &B, C); }
|
||||
cmd ::= DROP CONSUMER GROUP exists_opt(A) force_opt(D) cgroup_name(B) ON topic_name(C). { pCxt->pRootNode = createDropCGroupStmt(pCxt, A, &B, &C, D); }
|
||||
|
||||
/************************************************ desc/describe *******************************************************/
|
||||
cmd ::= DESC full_table_name(A). { pCxt->pRootNode = createDescribeStmt(pCxt, A); }
|
||||
|
|
|
@ -3814,7 +3814,7 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName) {
|
||||
SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pTopicName, bool force) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
CHECK_NAME(checkTopicName(pCxt, pTopicName));
|
||||
SDropTopicStmt* pStmt = NULL;
|
||||
|
@ -3822,12 +3822,13 @@ SNode* createDropTopicStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken
|
|||
CHECK_MAKE_NODE(pStmt);
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
||||
pStmt->ignoreNotExists = ignoreNotExists;
|
||||
pStmt->force = force;
|
||||
return (SNode*)pStmt;
|
||||
_err:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName) {
|
||||
SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pCGroupId, SToken* pTopicName, bool force) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
CHECK_NAME(checkTopicName(pCxt, pTopicName));
|
||||
CHECK_NAME(checkCGroupName(pCxt, pCGroupId));
|
||||
|
@ -3835,6 +3836,7 @@ SNode* createDropCGroupStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToke
|
|||
pCxt->errCode = nodesMakeNode(QUERY_NODE_DROP_CGROUP_STMT, (SNode**)&pStmt);
|
||||
CHECK_MAKE_NODE(pStmt);
|
||||
pStmt->ignoreNotExists = ignoreNotExists;
|
||||
pStmt->force = force;
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->topicName, pTopicName);
|
||||
COPY_STRING_FORM_ID_TOKEN(pStmt->cgroup, pCGroupId);
|
||||
return (SNode*)pStmt;
|
||||
|
|
|
@ -11773,6 +11773,7 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt
|
|||
|
||||
snprintf(dropReq.name, sizeof(dropReq.name), "%d.%s", pCxt->pParseCxt->acctId, pStmt->topicName);
|
||||
dropReq.igNotExists = pStmt->ignoreNotExists;
|
||||
dropReq.force = pStmt->force;
|
||||
|
||||
int32_t code = buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq);
|
||||
tFreeSMDropTopicReq(&dropReq);
|
||||
|
@ -11788,6 +11789,7 @@ static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pSt
|
|||
if (TSDB_CODE_SUCCESS != code) return code;
|
||||
(void)tNameGetFullDbName(&name, dropReq.topic);
|
||||
dropReq.igNotExists = pStmt->ignoreNotExists;
|
||||
dropReq.force = pStmt->force;
|
||||
tstrncpy(dropReq.cgroup, pStmt->cgroup, TSDB_CGROUP_LEN);
|
||||
|
||||
return buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_CGROUP, (FSerializeFunc)tSerializeSMDropCgroupReq, &dropReq);
|
||||
|
|
|
@ -506,6 +506,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32526.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32471.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts6115.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
|
||||
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
|
||||
sys.path.append("./7-tmq")
|
||||
|
||||
insertJson = '''{
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "localhost",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"connection_pool_size": 10,
|
||||
"thread_count": 10,
|
||||
"create_table_thread_count": 10,
|
||||
"result_file": "./insert-2-2-1.txt",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"num_of_records_per_req": 3600,
|
||||
"prepared_rand": 3600,
|
||||
"chinese": "no",
|
||||
"escape_character": "yes",
|
||||
"continue_if_fail": "no",
|
||||
"databases": [
|
||||
{
|
||||
"dbinfo": {
|
||||
"name": "ts6115",
|
||||
"drop": "yes",
|
||||
"vgroups": 10,
|
||||
"precision": "ms",
|
||||
"buffer": 512,
|
||||
"cachemodel":"'both'",
|
||||
"stt_trigger": 1
|
||||
},
|
||||
"super_tables": [
|
||||
{
|
||||
"name": "stb",
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": 10000,
|
||||
"childtable_prefix": "d_",
|
||||
"auto_create_table": "yes",
|
||||
"batch_create_tbl_num": 10,
|
||||
"data_source": "csv",
|
||||
"insert_mode": "stmt",
|
||||
"non_stop_mode": "no",
|
||||
"line_protocol": "line",
|
||||
"insert_rows": 1000,
|
||||
"childtable_limit": 0,
|
||||
"childtable_offset": 0,
|
||||
"interlace_rows": 0,
|
||||
"insert_interval": 0,
|
||||
"partial_col_num": 0,
|
||||
"timestamp_step": 1000,
|
||||
"start_timestamp": "2024-11-01 00:00:00.000",
|
||||
"sample_format": "csv",
|
||||
"sample_file": "./td_double10000_juchi.csv",
|
||||
"use_sample_ts": "no",
|
||||
"tags_file": "",
|
||||
"columns": [
|
||||
{"type": "DOUBLE", "name": "val"},
|
||||
{ "type": "INT", "name": "quality"}
|
||||
],
|
||||
"tags": [
|
||||
{"type": "INT", "name": "id", "max": 100, "min": 1}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}'''
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||
clientCfgDict = {'debugFlag': 135, 'asynclog': 0}
|
||||
updatecfgDict["clientCfg"] = clientCfgDict
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def run(self):
|
||||
|
||||
with open('ts-6115.json', 'w') as file:
|
||||
file.write(insertJson)
|
||||
|
||||
tdLog.info("start to insert data: taosBenchmark -f ts-6115.json")
|
||||
if os.system("taosBenchmark -f ts-6115.json") != 0:
|
||||
tdLog.exit("taosBenchmark -f ts-6115.json")
|
||||
|
||||
tdLog.info("test ts-6115 ......")
|
||||
|
||||
buildPath = tdCom.getBuildPath()
|
||||
cmdStr = '%s/build/bin/tmq_ts6115'%(buildPath)
|
||||
|
||||
tdLog.info(cmdStr)
|
||||
if os.system(cmdStr) != 0:
|
||||
tdLog.exit(cmdStr)
|
||||
|
||||
return
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -6,6 +6,7 @@ add_executable(tmq_ts5466 tmq_ts5466.c)
|
|||
add_executable(tmq_td32526 tmq_td32526.c)
|
||||
add_executable(tmq_td32187 tmq_td32187.c)
|
||||
add_executable(tmq_ts5776 tmq_ts5776.c)
|
||||
add_executable(tmq_ts6115 tmq_ts6115.c)
|
||||
add_executable(tmq_td32471 tmq_td32471.c)
|
||||
add_executable(tmq_td33798 tmq_td33798.c)
|
||||
add_executable(tmq_poll_test tmq_poll_test.c)
|
||||
|
@ -96,6 +97,14 @@ target_link_libraries(
|
|||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
tmq_ts6115
|
||||
PUBLIC ${TAOS_LIB}
|
||||
PUBLIC util
|
||||
PUBLIC common
|
||||
PUBLIC os
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
tmq_poll_test
|
||||
PUBLIC ${TAOS_LIB}
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include "types.h"
|
||||
|
||||
const char* topic_name = "t1";
|
||||
bool pollStart = false;
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT(pConn != NULL);
|
||||
|
||||
pRes = taos_query(pConn, "drop topic if exists t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic t1 as database ts6115");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void* consumeThreadFunc(void* param) {
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_1");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "false");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, topic_name);
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
int32_t timeout = 200;
|
||||
while (1) {
|
||||
printf("start to poll\n");
|
||||
|
||||
TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout);
|
||||
pollStart = true;
|
||||
if (pRes == NULL) {
|
||||
printf("pRes is NULL, reason:%s\n", taos_errstr(NULL));
|
||||
break;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taosMsleep(200);
|
||||
}
|
||||
tmq_consumer_close(tmq);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void* dropTopicThreadFunc(void* param) {
|
||||
printf("drop topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT(pConn != NULL);
|
||||
|
||||
while(!pollStart) {
|
||||
taosSsleep(taosRand()%5);
|
||||
}
|
||||
pRes = taos_query(pConn, "drop topic force t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "show consumers");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
ASSERT(taos_affected_rows(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void* dropCGroupThreadFunc(void* param) {
|
||||
printf("drop topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT(pConn != NULL);
|
||||
|
||||
while(!pollStart) {
|
||||
taosSsleep(taosRand()%5);
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "drop consumer group force group_1 on t1");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "show consumers");
|
||||
ASSERT(taos_errno(pRes) == 0);
|
||||
ASSERT(taos_affected_rows(pRes) == 0);
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
printf("test start.........\n");
|
||||
|
||||
int32_t runTimes = 1;
|
||||
TdThread thread1, thread2;
|
||||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
for (int i = 0; i < runTimes; i++){
|
||||
printf("test drop topic times:%d\n", i);
|
||||
|
||||
pollStart = false;
|
||||
create_topic();
|
||||
taosThreadCreate(&(thread1), &thattr, dropTopicThreadFunc, NULL);
|
||||
taosThreadCreate(&(thread2), &thattr, consumeThreadFunc, NULL);
|
||||
|
||||
taosThreadJoin(thread1, NULL);
|
||||
taosThreadClear(&thread1);
|
||||
|
||||
taosThreadJoin(thread2, NULL);
|
||||
taosThreadClear(&thread2);
|
||||
}
|
||||
|
||||
create_topic();
|
||||
|
||||
for (int i = 0; i < runTimes; i++){
|
||||
printf("test drop consumer group times:%d\n", i);
|
||||
|
||||
pollStart = false;
|
||||
taosThreadCreate(&(thread1), &thattr, dropCGroupThreadFunc, NULL);
|
||||
taosThreadCreate(&(thread2), &thattr, consumeThreadFunc, NULL);
|
||||
|
||||
taosThreadJoin(thread1, NULL);
|
||||
taosThreadClear(&thread1);
|
||||
|
||||
taosThreadJoin(thread2, NULL);
|
||||
taosThreadClear(&thread2);
|
||||
}
|
||||
|
||||
printf("test end.........\n");
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue