Merge branch '3.0' into fix/TD-31163-3.0

This commit is contained in:
kailixu 2024-08-20 09:04:12 +08:00
commit 3ee206b781
15 changed files with 240 additions and 29 deletions

View File

@ -0,0 +1,168 @@
---
toc_max_heading_level: 4
title: 权限管理
---
TDengine 中的权限管理分为[用户管理](../user)、数据库授权管理以及消息订阅授权管理,本节重点说明数据库授权和订阅授权。
## 数据库访问授权
系统管理员可以根据业务需要对系统中的每个用户针对每个数据库进行特定的授权,以防止业务数据被不恰当的用户读取或修改。对某个用户进行数据库访问授权的语法如下:
```sql
GRANT privileges ON priv_level TO user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
```
对数据库的访问权限包含读和写两种权限,它们可以被分别授予,也可以被同时授予。
说明
- priv_level 格式中 "." 之前为数据库名称, "." 之后为表名称,意思为表级别的授权控制。如果 "." 之后为 "\*" ,意为 "." 前所指定的数据库中的所有表
- "dbname.\*" 意思是名为 "dbname" 的数据库中的所有表
- "\*.\*" 意思是所有数据库名中的所有表
### 数据库权限说明
对 root 用户和普通用户的权限的说明如下表
| 用户 | 描述 | 权限说明 |
| -------- | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 超级用户 | 只有 root 是超级用户 | DB 外部 所有操作权限例如user、dnode、udf、qnode等的CRUD DB 权限,包括 创建 删除 更新,例如修改 Option移动 Vgruop等 读 写 Enable/Disable 用户 |
| 普通用户 | 除 root 以外的其它用户均为普通用户 | 在可读的 DB 中,普通用户可以进行读操作 select describe show subscribe 在可写 DB 的内部,用户可以进行写操作: 创建、删除、修改 超级表 创建、删除、修改 子表 创建、删除、修改 topic 写入数据 被限制系统信息时,不可进行如下操作 show dnode、mnode、vgroups、qnode、snode 修改用户包括自身密码 show db时只能看到自己的db并且不能看到vgroups、副本、cache等信息 无论是否被限制系统信息,都可以 管理 udf 可以创建 DB 自己创建的 DB 具备所有权限 非自己创建的 DB ,参照读、写列表中的权限 |
## 消息订阅授权
任意用户都可以在自己拥有读权限的数据库上创建 topic。超级用户 root 可以在任意数据库上创建 topic。每个 topic 的订阅权限都可以被独立授权给任何用户,不管该用户是否拥有该数据库的访问权限。删除 topic 只能由 root 用户或者该 topic 的创建者进行。topic 只能由超级用户、topic的创建者或者被显式授予 subscribe 权限的用户订阅。
具体的 SQL 语法如下:
```sql
GRANT SUBSCRIBE ON topic_name TO user_name
REVOKE SUBSCRIBE ON topic_name FROM user_name
```
## 基于标签的授权(表级授权)
从 TDengine 3.0.5.0 开始,我们支持按标签授权某个超级表中部分特定的子表。具体的 SQL 语法如下。
```sql
GRANT privileges ON priv_level [WITH tag_condition] TO user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
REVOKE privileges ON priv_level [WITH tag_condition] FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
```
上面 SQL 的语义为:
- 用户可以通过 dbname.tbname 来为指定的表(包括超级表和普通表)授予或回收其读写权限,不支持直接对子表授予或回收权限。
- 用户可以通过 dbname.tbname 和 WITH 子句来为符合条件的所有子表授予或回收其读写权限。使用 WITH 子句时,权限级别必须为超级表。
## 表级权限和数据库权限的关系
下表列出了在不同的数据库授权和表级授权的组合下产生的实际权限。
| | **表无授权** | **表读授权** | **表读授权有标签条件** | **表写授权** | **表写授权有标签条件** |
| ---------------- | ---------------- | ---------------------------------------- | ------------------------------------------------------------ | ---------------------------------------- | ---------------------------------------------------------- |
| **数据库无授权** | 无授权 | 对此表有读权限,对数据库下的其他表无权限 | 对此表符合标签权限的子表有读权限,对数据库下的其他表无权限 | 对此表有写权限,对数据库下的其他表无权限 | 对此表符合标签权限的子表有写权限,对数据库下的其他表无权限 |
| **数据库读授权** | 对所有表有读权限 | 对所有表有读权限 | 对此表符合标签权限的子表有读权限,对数据库下的其他表有读权限 | 对此表有写权限,对所有表有读权限 | 对此表符合标签权限的子表有写权限,所有表有读权限 |
| **数据库写授权** | 对所有表有写权限 | 对此表有读权限,对所有表有写权限 | 对此表符合标签权限的子表有读权限,对所有表有写权限 | 对所有表有写权限 | 对此表符合标签权限的子表有写权限,数据库下的其他表有写权限 |
## 查看用户授权
使用下面的命令可以显示一个用户所拥有的授权:
```sql
show user privileges
```
## 撤销授权
1. 撤销数据库访问的授权
```sql
REVOKE privileges ON priv_level FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
```
2. 撤销数据订阅的授权
```sql
REVOKE privileges ON priv_level FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
SUBSCRIBE
}
priv_level : {
topic_name
}
```

View File

@ -27,22 +27,21 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
## 安装 Kafka
在任意目录下执行:
- 在任意目录下执行:
```shell
curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
```
```shell
curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
```
然后需要把 `$KAFKA_HOME/bin` 目录加入 PATH。
- 然后需要把 `$KAFKA_HOME/bin` 目录加入 PATH。
```title=".profile"
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
```
以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile
```title=".profile"
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
```
以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile
## 安装 TDengine Connector 插件
@ -325,6 +324,21 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
```
### 性能调优
如果在从 TDengine 同步数据到 Kafka 的过程中发现性能不达预期,可以尝试使用如下参数提升 Kafka 的写入吞吐量。
1. 打开 KAFKA_HOME/config/producer.properties 配置文件。
2. 参数说明及配置建议如下:
| **参数** | **参数说明** | **设置建议** |
| --------| --------------------------------- | -------------- |
| producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async |
| request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时表示只要领导者副本成功写入消息就会给生产者发送确认而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功所以可以减少生产者的等待时间提高发送消息的效率。|1|
| max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576也就是 1M。如果设置得太小可能会导致频繁的网络请求降低吞吐量。如果设置得太大可能会导致内存占用过高或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600|
|batch.size| 此参数用于设定 batch 的大小,默认值为 16384即 16KB。在消息发送过程中发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288|
| buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824|
## 配置参考
### 通用配置

View File

@ -1135,8 +1135,6 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
}
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
}
taosMemoryFree(pResult);

View File

@ -254,6 +254,16 @@ static int32_t ttlMgrFindExpiredOneEntry(const void *pKey, int keyLen, const voi
return c;
}
// static int32_t ttlMgrDumpOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pDumpCtx) {
// STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
// int64_t *ttlDays = (int64_t *)pVal;
// metaInfo("ttlMgr dump, ttl: %" PRId64 ", ctime: %" PRId64 ", uid: %" PRId64, *ttlDays, ttlKey->deleteTimeMs,
// ttlKey->uid);
// TAOS_RETURN(TSDB_CODE_SUCCESS);
// }
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
SMeta *meta = pMeta;
@ -402,15 +412,20 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
int32_t code = TSDB_CODE_SUCCESS;
void *pIter = taosHashIterate(pTtlMgr->pDirtyUids, NULL);
while (pIter != NULL) {
void *pIter = NULL;
while ((pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter)) != NULL) {
STtlDirtyEntry *pEntry = (STtlDirtyEntry *)pIter;
tb_uid_t *pUid = taosHashGetKey(pIter, NULL);
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (cacheEntry == NULL) {
metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
pEntry->type);
metaInfo("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid,
pEntry->type);
code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid));
if (TSDB_CODE_SUCCESS != code) {
metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code));
goto _out;
}
continue;
}
@ -422,9 +437,9 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
if (pEntry->type == ENTRY_TYPE_UPSERT) {
// delete old key & upsert new key
(void)tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error
(void)tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error
code = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty,
sizeof(cacheEntry->ttlDaysDirty), pTxn);
sizeof(cacheEntry->ttlDaysDirty), pTxn);
if (TSDB_CODE_SUCCESS != code) {
metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(code));
goto _out;
@ -449,9 +464,15 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
goto _out;
}
void *pIterTmp = pIter;
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIterTmp);
(void)taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(tb_uid_t));
metaDebug("isdel:%d", pEntry->type == ENTRY_TYPE_DELETE);
metaDebug("ttlkey:%" PRId64 ", uid:%" PRId64, ttlKey.deleteTimeMs, ttlKey.uid);
metaDebug("ttlkeyDirty:%" PRId64 ", uid:%" PRId64, ttlKeyDirty.deleteTimeMs, ttlKeyDirty.uid);
code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid));
if (TSDB_CODE_SUCCESS != code) {
metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code));
goto _out;
}
}
taosHashClear(pTtlMgr->pDirtyUids);
@ -459,6 +480,8 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
code = TSDB_CODE_SUCCESS;
_out:
taosHashCancelIterate(pTtlMgr->pDirtyUids, pIter);
endNs = taosGetTimestampNs();
metaTrace("%s, ttl mgr flush end, time consumed: %" PRId64 " ns", pTtlMgr->logPrefix, endNs - startNs);

View File

@ -375,11 +375,11 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
}
tjsonGetNumberValue(pJson, "s3ChunkSize", pCfg->s3ChunkSize, code);
if (code < 0) {
if (code < 0 || pCfg->s3ChunkSize < TSDB_MIN_S3_CHUNK_SIZE) {
pCfg->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE;
}
tjsonGetNumberValue(pJson, "s3KeepLocal", pCfg->s3KeepLocal, code);
if (code < 0) {
if (code < 0 || pCfg->s3KeepLocal < TSDB_MIN_S3_KEEP_LOCAL) {
pCfg->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
}
tjsonGetNumberValue(pJson, "s3Compact", pCfg->s3Compact, code);

View File

@ -1746,6 +1746,9 @@ void destroyGrpArray(void* ppArray) {
}
void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
if (NULL == pTable) {
return;
}
mJoinDestroyCreatedBlks(pTable->createdBlks);
taosArrayDestroy(pTable->createdBlks);
tSimpleHashCleanup(pTable->pGrpHash);

View File

@ -2372,6 +2372,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
}
gRes[gResIdx]->colInfo = taosMemoryCalloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(SFilterColInfo));
if (gRes[gResIdx]->colInfo == NULL) {
filterFreeGroupCtx(gRes[gResIdx]);
FLT_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
colIdxi = 0;
@ -2384,6 +2385,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
if (gRes[gResIdx]->colInfo[cidx].info == NULL) {
gRes[gResIdx]->colInfo[cidx].info = (SArray *)taosArrayInit(4, POINTER_BYTES);
if (gRes[gResIdx]->colInfo[cidx].info == NULL) {
filterFreeGroupCtx(gRes[gResIdx]);
FLT_ERR_JRET(terrno);
}
colIdx[colIdxi++] = cidx;
@ -2408,7 +2410,11 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
continue;
}
FLT_ERR_JRET(filterMergeUnits(info, gRes[gResIdx], colIdx[l], &empty));
code = filterMergeUnits(info, gRes[gResIdx], colIdx[l], &empty);
if (TSDB_CODE_SUCCESS != code) {
filterFreeGroupCtx(gRes[gResIdx]);
SCL_ERR_JRET(code);
}
if (empty) {
break;
@ -2426,10 +2432,9 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
gRes[gResIdx]->colNum = colIdxi;
FILTER_COPY_IDX(&gRes[gResIdx]->colIdx, colIdx, colIdxi);
++gResIdx;
*gResNum = gResIdx;
}
*gResNum = gResIdx;
if (gResIdx == 0) {
FILTER_SET_FLAG(info->status, FI_STATUS_EMPTY);
}