other: merge 3.0

This commit is contained in:
Haojun Liao 2023-09-04 13:08:19 +08:00
commit 21dab8ceab
21 changed files with 262 additions and 53 deletions

View File

@ -2,6 +2,7 @@
ExternalProject_Add(mxml
GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git
GIT_TAG v2.12
SOURCE_DIR "${TD_CONTRIB_DIR}/mxml"
#BINARY_DIR ""
BUILD_IN_SOURCE TRUE

View File

@ -11,7 +11,11 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能
## 安装
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark
taosBenchmark 有两种安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考[ TDengine 安装](../../operation/pkg-install)。
- 单独编译 taos-tools 并安装, 详情请参考 [taos-tools](https://github.com/taosdata/taos-tools) 仓库。
## 运行

View File

@ -13,7 +13,12 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的
## 安装
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper
taosKeeper 有两种安装方式:
taosKeeper 安装方式:
- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../operation/pkg-install)。
- 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。
## 配置和运行方式

View File

@ -0,0 +1,80 @@
---
title: 集群运维
description: TDengine 提供了多种集群运维手段以使集群运行更健康更高效
---
为了使集群运行更健康更高效TDengine 企业版提供了一些运维手段来帮助系统管理员更好地运维集群。
## 数据重整
TDengine 面向多种写入场景在有些写入场景下TDengine 的存储会导致数据存储的放大或数据文件的空洞等。这一方面影响数据的存储效率另一方面也会影响查询效率。为了解决上述问题TDengine 企业版提供了对数据的重整功能,即 DATA COMPACT 功能,将存储的数据文件重新整理,删除文件空洞和无效数据,提高数据的组织度,从而提高存储和查询的效率。
**语法**
```sql
COMPACT DATABASE db_name [start with 'XXXX'] [end with 'YYYY']
```
**效果**
- 扫描并压缩指定的 DB 中所有 VGROUP 中 VNODE 的所有数据文件
- COMPCAT 会删除被删除数据以及被删除的表的数据
- COMPACT 会合并多个 STT 文件
- 可通过 start with 关键字指定 COMPACT 数据的起始时间
- 可通过 end with 关键字指定 COMPACT 数据的终止时间
**补充说明**
- COMPACT 为异步,执行 COMPACT 命令后不会等 COMPACT 结束就会返回。如果上一个 COMPACT 没有完成则再发起一个 COMPACT 任务,则会等上一个任务完成后再返回。
- COMPACT 可能阻塞写入,但不阻塞查询
- COMPACT 的进度不可观测
## 集群负载再平衡
当多副本集群中的一个或多个节点因为升级或其它原因而重启后,有可能出现集群中各个 dnode 负载不均衡的现象,极端情况下会出现所有 vgroup 的 leader 都位于同一个 dnode 的情况。为了解决这个问题,可以使用下面的命令
```sql
balance vgroup leader;
```
**功能**
让所有的 vgroup 的 leade r在各自的replica节点上均匀分布。这个命令会让 vgroup 强制重新选举,通过重新选举,在选举的过程中,变换 vgroup 的leader通过这个方式最终让leader均匀分布。
**注意**
Raft选举本身带有随机性所以通过选举的重新分布产生的均匀分布也是带有一定的概率不会完全的均匀。**该命令的副作用是影响查询和写入**在vgroup重新选举时从开始选举到选举出新的 leader 这段时间,这 个vgroup 无法写入和查询。选举过程一般在秒级完成。所有的vgroup会依次逐个重新选举。
## 恢复数据节点
在多节点三副本的集群环境中,如果某个 dnode 的磁盘损坏,该 dnode 会自动退出,但集群中其它的 dnode 仍然能够继续提供写入和查询服务。
在更换了损坏的磁盘后,如果想要让曾经主动退出的 dnode 重新加入集群提供服务,可以通过 `restore dnode` 命令来恢复该数据节点上的部分或全部逻辑节点,该功能依赖多副本中的其它副本进行数据复制,所以只在集群中 dnode 数量大于等于 3 且副本数为 3 的情况下能够工作。
```sql
restore dnode <dnode_id># 恢复dnode上的mnode所有vnode和qnode
restore mnode on dnode <dnode_id># 恢复dnode上的mnode
restore vnode on dnode <dnode_id> # 恢复dnode上的所有vnode
restore qnode on dnode <dnode_id># 恢复dnode上的qnode
```
**限制**
- 该功能是基于已有的复制功能的恢复,不是灾难恢复或者备份恢复,所以对于要恢复的 mnode 和 vnode来说使用该命令的前提是还存在该 mnode 或 vnode 的其它两个副本仍然能够正常工作。
- 该命令不能修复数据目录中的个别文件的损坏或者丢失。例如,如果某个 mnode 或者 vnode 中的个别文件或数据损坏,无法单独恢复损坏的某个文件或者某块数据。此时,可以选择将该 mnode/vnode 的数据全部清空再进行恢复。
## 虚拟组分裂 Scale Out
当一个 vgroup 因为子表数过多而导致 CPU 或 Disk 资源使用量负载过高时,增加 dnode 节点后,可通过 `split vgroup` 命令把该 vgroup 分裂为两个虚拟组。分裂完成后,新产生的两个 vgroup 承担原来由一个 vgroup 提供的读写服务。这也是 TDengine 为企业版用户提供的 scale out 集群的能力。
```sql
split vgroup <vgroup_id>
```
**注意**
- 单副本库虚拟组,在分裂完成后,历史时序数据总磁盘空间使用量,可能会翻倍。所以,在执行该操作之前,通过增加 dnode 节点方式,确保集群中有足够的 CPU 和磁盘资源,避免资源不足现象发生。
- 该命令为 DB 级事务执行过程当前DB的其它管理事务将会被拒绝。集群中其它DB不受影响。
- 分裂任务执行过程中,可持续提供读写服务;期间,可能存在可感知的短暂的读写业务中断。
- 在分裂过程中,不支持流和订阅。分裂结束后,历史 WAL 会清空。
- 分裂过程中,可支持节点宕机重启容错;但不支持节点磁盘故障容错。

View File

@ -0,0 +1,56 @@
---
title: 多级存储
---
## 多级存储
说明:多级存储功能仅企业版支持。
在默认配置下TDengine 会将所有数据保存在 /var/lib/taos 目录下,而且每个 vnode 的数据文件保存在该目录下的不同目录。为扩大存储空间,尽量减少文件读取的瓶颈,提高数据吞吐率 TDengine 可通过配置系统参数 dataDir 让多个挂载的硬盘被系统同时使用。
除此之外TDengine 也提供了数据分级存储的功能,将不同时间段的数据存储在挂载的不同介质上的目录里,从而实现不同“热度”的数据存储在不同的存储介质上,充分利用存储,节约成本。比如,最新采集的数据需要经常访问,对硬盘的读取性能要求高,那么用户可以配置将这些数据存储在 SSD 盘上。超过一定期限的数据,查询需求量没有那么高,那么可以存储在相对便宜的 HDD 盘上。
多级存储支持 3 级,每级最多可配置 16 个挂载点。
TDengine 多级存储配置方式如下(在配置文件/etc/taos/taos.cfg 中):
```
dataDir [path] <level> <primary>
```
- path: 挂载点的文件夹路径
- level: 介质存储等级,取值为 012。
0 级存储最新的数据1 级存储次新的数据2 级存储最老的数据,省略默认为 0。
各级存储之间的数据流向0 级存储 -> 1 级存储 -> 2 级存储。
同一存储等级可挂载多个硬盘,同一存储等级上的数据文件分布在该存储等级的所有硬盘上。
需要说明的是,数据在不同级别的存储介质上的移动,是由系统自动完成的,用户无需干预。
- primary: 是否为主挂载点0或 1省略默认为 1。
在配置中只允许一个主挂载点的存在level=0primary=1例如采用如下的配置方式
```
dataDir /mnt/data1 0 1
dataDir /mnt/data2 0 0
dataDir /mnt/data3 1 0
dataDir /mnt/data4 1 0
dataDir /mnt/data5 2 0
dataDir /mnt/data6 2 0
```
:::note
1. 多级存储不允许跨级配置,合法的配置方案有:仅 0 级,仅 0 级+ 1 级,以及 0 级+ 1 级+ 2 级。而不允许只配置 level=0 和 level=2而不配置 level=1。
2. 禁止手动移除使用中的挂载盘,挂载盘目前不支持非本地的网络盘。
3. 多级存储目前不支持删除已经挂载的硬盘的功能。
:::
## 0 级负载均衡
在多级存储中,有且只有一个主挂载点,主挂载点承担了系统中最重要的元数据在座,同时各个 vnode 的主目录均存在于当前 dnode 主挂载点上,从而导致该 dnode 的写入性能受限于单个磁盘的 IO 吞吐能力。
从 TDengine 3.1.0.0 开始,如果一个 dnode 配置了多个 0 级挂载点,我们将该 dnode 上所有 vnode 的主目录均衡分布在所有的 0 级挂载点上,由这些 0 级挂载点共同承担写入负荷。在网络 I/O 及其它处理资源不成为瓶颈的情况下,通过优化集群配置,测试结果证明整个系统的写入能力和 0 级挂载点的数量呈现线性关系,即随着 0 级挂载点数量的增加,整个系统的写入能力也成倍增加。
## 同级挂载点选择策略
一般情况下,当 TDengine 要从同级挂载点中选择一个用于生成新的数据文件时,采用 round robin 策略进行选择。但现实中有可能每个磁盘的容量不相同,或者容量相同但写入的数据量不相同,这就导致会出现每个磁盘上的可用空间不均衡,在实际进行选择时有可能会选择到一个剩余空间已经很小的磁盘。为了解决这个问题,从 3.1.1.0 开始引入了一个新的配置 `minDiskFreeSize`,当某块磁盘上的可用空间小于等于这个阈值时,该磁盘将不再被选择用于生成新的数据文件。该配置项的单位为字节,其值应该大于 2GB即会跳过可用空间小于 2GB 的挂载点。

View File

@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
1. Linux 操作系统
2. 已安装 Java 8 和 Maven
3. 已安装 Git、curl、vi
4. 已安装并启动 TDengine。
4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../operation/pkg-install)
## 安装 Kafka

View File

@ -248,7 +248,7 @@ typedef struct SStreamChildEpInfo {
typedef struct SStreamTaskKey {
int64_t streamId;
int32_t taskId;
int64_t taskId;
} SStreamTaskKey;
typedef struct SStreamTaskId {

View File

@ -876,13 +876,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
char detail[2000] = {0};
sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", "
"fillHistory:%d, igExists:%d, "
"igExpired:%d, igUpdate:%d, lastTs:%" PRId64
", "
"maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, "
"targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64
", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,

View File

@ -1760,3 +1760,44 @@ _end:
return rsp.code;
}
int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process stop req, failed to acquire task:0x%x, it may have been dropped already", vgId,
pReq->taskId);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive stop msg from mnode", pTask->id.idStr);
streamTaskStop(pTask);
SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId);
streamMetaReleaseTask(pMeta, pTask);
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
streamTaskStop(pHistoryTask);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
streamMetaReleaseTask(pMeta, pTask);
tmsgSendRsp(&rsp);
return 0;
}

View File

@ -386,6 +386,16 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
taosThreadMutexLock(&pTask->lock);
pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
if (pTask->status.taskStatus != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
if (pItem != NULL) {
noDataInWal = false;
code = streamTaskPutDataIntoInputQ(pTask, pItem);

View File

@ -425,6 +425,20 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _exit;
}
if (pWriter->pStreamTaskWriter) {
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pStreamStateWriter) {
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
if (code) goto _exit;
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0);
pWriter->pStreamStateWriter = NULL;
if (code) goto _exit;
}
if (pWriter->pRsmaSnapWriter) {
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
if (code) goto _exit;

View File

@ -3754,6 +3754,10 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelec
pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
int32_t code = TSDB_CODE_SUCCESS;
if (pSelect->pPartitionByList) {
code = removeConstantValueFromList(&pSelect->pPartitionByList);
}
if (TSDB_CODE_SUCCESS == code && pSelect->pPartitionByList) {
int8_t typeType = getTableTypeFromTableNode(pSelect->pFromTable);
SNode* pPar = nodesListGetNode(pSelect->pPartitionByList, 0);

View File

@ -312,7 +312,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t remain = atomic_sub_fetch_32(&pMeta->chkptNotReadyTasks, 1);
ASSERT(remain >= 0);
if (remain == 0) { // all tasks are ready
if (remain == 0) { // all tasks are in TASK_STATUS__CK_READY state
qDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
pMeta->totalTasks = 0;

View File

@ -2157,7 +2157,6 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
if (rpcDebugFlag & DEBUG_DEBUG) {
STraceId* trace = &pMsg->msg.info.traceId;
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf,
pCtx->retryStep, pCtx->retryNextInterval);
@ -2388,7 +2387,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (rpcDebugFlag & DEBUG_TRACE) {
char tbuf[512] = {0};
EPSET_TO_STR(&pCtx->epSet, tbuf);
tGTrace("%s conn %p extract epset from msg, use:%s", CONN_GET_INST_LABEL(pConn), pConn, tbuf);
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
}
}
if (rpcDebugFlag & DEBUG_TRACE) {