Merge branch '3.0' into fix/m23.0

This commit is contained in:
dapan1121 2023-06-29 14:03:56 +08:00 committed by GitHub
commit c8435de64a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 90 additions and 54 deletions

View File

@ -244,6 +244,8 @@ The following SQL statement creates a topic in TDengine:
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```
- There is an upper limit to the number of topics created, controlled by the parameter tmqMaxTopicNum, with a default of 20
Multiple subscription types are supported.
#### Subscribe to a Column
@ -265,14 +267,15 @@ You can subscribe to a topic through a SELECT statement. Statements that specify
Syntax:
```sql
CREATE TOPIC topic_name AS STABLE stb_name
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
```
Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows:
- The table schema can be modified.
- Unstructured data is returned. The format of the data returned changes based on the supertable schema.
- A different table schema may exist for every data block to be processed.
- The 'with meta' parameter is optional. When selected, statements such as creating super tables and sub tables will be returned, mainly used for Taosx to perform super table migration
- The 'where_condition' parameter is optional and will be used to filter and subscribe to sub tables that meet the criteria. Where conditions cannot have ordinary columns, only tags or tbnames. Functions can be used in where conditions to filter tags, but cannot be aggregate functions because sub table tag values cannot be aggregated. It can also be a constant expression, such as 2>1 (subscribing to all child tables), Or false (subscribe to 0 sub tables)
- The data returned does not include tags.
### Subscribe to a Database
@ -280,10 +283,12 @@ Creating a topic in this manner differs from a `SELECT * from stbName` statement
Syntax:
```sql
CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
```
This SQL statement creates a subscription to all tables in the database. You can add the `WITH META` parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka.
This SQL statement creates a subscription to all tables in the database.
- The 'with meta' parameter is optional. When selected, it will return statements for creating all super tables and sub tables in the database, mainly used for Taosx database migration
## Create a Consumer
@ -295,7 +300,7 @@ You configure the following parameters when creating a consumer:
| `td.connect.user` | string | User Name | |
| `td.connect.pass` | string | Password | |
| `td.connect.port` | string | Port of the server side | |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. |
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. |
| `client.id` | string | Client ID | Maximum length: 192. |
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |

View File

@ -36,7 +36,7 @@ Shows information about connections to the system.
SHOW CONSUMERS;
```
Shows information about all active consumers in the system.
Shows information about all consumers in the system.
## SHOW CREATE DATABASE

View File

@ -243,6 +243,7 @@ TDengine 使用 SQL 创建一个 topic
```sql
CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;
```
- topic创建个数有上限通过参数 tmqMaxTopicNum 控制,默认 20 个
TMQ 支持多种订阅类型:
@ -265,14 +266,15 @@ CREATE TOPIC topic_name as subquery
语法:
```sql
CREATE TOPIC topic_name AS STABLE stb_name
CREATE TOPIC topic_name [with meta] AS STABLE stb_name [where_condition]
```
与 `SELECT * from stbName` 订阅的区别是:
- 不会限制用户的表结构变更。
- 返回的是非结构化的数据:返回数据的结构会随之超级表的表结构变化而变化。
- 用户对于要处理的每一个数据块都可能有不同的表结构。
- with meta 参数可选选择时将返回创建超级表子表等语句主要用于taosx做超级表迁移
- where_condition 参数可选选择时将用来过滤符合条件的子表订阅这些子表。where 条件里不能有普通列只能是tag或tbnamewhere条件里可以用函数用来过滤tag但是不能是聚合函数因为子表tag值无法做聚合。也可以是常量表达式比如 2 > 1订阅全部子表或者 false订阅0个子表
- 返回数据不包含标签。
### 数据库订阅
@ -280,11 +282,13 @@ CREATE TOPIC topic_name AS STABLE stb_name
语法:
```sql
CREATE TOPIC topic_name AS DATABASE db_name;
CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
```
通过该语句可创建一个包含数据库所有表数据的订阅
- with meta 参数可选选择时将返回创建数据库里所有超级表子表的语句主要用于taosx做数据库迁移
## 创建消费者 *consumer*
消费者需要通过一系列配置选项创建,基础配置项如下表所示:
@ -295,7 +299,7 @@ CREATE TOPIC topic_name AS DATABASE db_name;
| `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | **必填项**。最大长度192。 |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立100个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度192。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default;从头开始订阅; <br/>`latest`: 仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |

View File

@ -36,7 +36,7 @@ SHOW CONNECTIONS;
SHOW CONSUMERS;
```
显示当前数据库下所有活跃的消费者的信息。
显示当前数据库下所有消费者的信息。
## SHOW CREATE DATABASE

View File

@ -815,7 +815,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->offset = pVg->offsetInfo.currentOffset;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
}
}
// tmq->needReportOffsetRows = false;
@ -1491,7 +1491,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
STqOffsetVal offsetNew = {0};
offsetNew.type = tmq->resetOffsetCfg;
SMqClientVg clientVg = {
.pollCnt = 0,

View File

@ -1186,7 +1186,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);

View File

@ -468,40 +468,51 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
}
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
// if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
if (pSub) {
taosRLockLatch(&pSub->lock);
bool init = false;
if (pOutput->pSub->offsetRows == NULL) {
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
init = true;
}
pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (init) {
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
// mDebug("pSub->offsetRows is init");
} else {
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
}
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
bool jump = false;
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, i);
if(pVgEp->vgId == d1->vgId){
jump = true;
mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
break;
}
}
if(jump) continue;
bool find = false;
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
find = true;
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
break;
}
}
if(!find){
taosArrayPush(pOutput->pSub->offsetRows, d1);
}
}
}
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
// }
}
// 8. generate logs
@ -760,8 +771,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0};
STrans *pTrans = NULL;
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
@ -780,11 +793,12 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
}
taosWLockLatch(&pSub->lock);
if (taosHashGetSize(pSub->consumerHash) != 0) {
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
return -1;
code = -1;
goto end;
}
void *pIter = NULL;
@ -802,30 +816,32 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}
if (mndTransPrepare(pMnode, pTrans) < 0) {
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return -1;
code = -1;
goto end;
}
mndReleaseSubscribe(pMnode, pSub);
return TSDB_CODE_ACTION_IN_PROGRESS;
end:
taosWUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return code;
}
void mndCleanupSubscribe(SMnode *pMnode) {}

View File

@ -388,7 +388,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64 " %d", pReader->nextBlk,
numOfBlocks, pReader->msg.msgLen, pReader->msg.ver, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
@ -403,7 +403,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
if (ret != NULL) {
tqDebug("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
tqTrace("tq reader return submit block, uid:%" PRId64 ", ver:%" PRId64, pSubmitTbData->uid, pReader->msg.ver);
SSDataBlock* pRes = NULL;
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
@ -412,11 +412,11 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
}
} else {
pReader->nextBlk += 1;
tqDebug("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
}
}
qDebug("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->msg.msgStr = NULL;
@ -604,7 +604,7 @@ static int32_t doSetVal(SColumnInfoData* pColumnInfoData, int32_t rowIndex, SCol
}
int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* id) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
SSDataBlock* pBlock = pReader->pResBlock;

View File

@ -1078,6 +1078,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SOperatorInfo* pOperator = pTaskInfo->pRoot;
const char* id = GET_TASKID(pTaskInfo);
if(subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG){
pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id);
if (pOperator == NULL) {
return -1;
}
SStreamScanInfo* pInfo = pOperator->info;
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
walReaderVerifyOffset(pWalReader, pOffset);
}
// if pOffset equal to current offset, means continue consume
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;

View File

@ -781,7 +781,7 @@
,,y,script,./test.sh -f tsim/user/basic.sim
,,y,script,./test.sh -f tsim/user/password.sim
,,y,script,./test.sh -f tsim/user/privilege_db.sim
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
#,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
,,y,script,./test.sh -f tsim/user/privilege_topic.sim
,,y,script,./test.sh -f tsim/user/privilege_table.sim
,,y,script,./test.sh -f tsim/db/alter_option.sim

View File

@ -245,7 +245,7 @@ class TDTestCase:
tdSql.query("show consumers")
tdSql.checkRows(1)
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000,reset:earliest")
tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000ms,reset:earliest")
time.sleep(2)
tdLog.info("start insert data")

View File

@ -94,7 +94,7 @@ class TDTestCase:
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
consumer_ret = "earliest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}'
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]}ms,reset:{consumer_ret}'
if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"]
consumer = Consumer(consumer_dict)