Merge remote-tracking branch 'origin/3.0' into feat/sync

This commit is contained in:
Shengliang Guan 2024-08-20 13:33:53 +08:00
commit 7261f5249c
32 changed files with 370 additions and 145 deletions

View File

@ -0,0 +1,168 @@
---
toc_max_heading_level: 4
title: 权限管理
---
TDengine 中的权限管理分为[用户管理](../user)、数据库授权管理以及消息订阅授权管理,本节重点说明数据库授权和订阅授权。
## 数据库访问授权
系统管理员可以根据业务需要对系统中的每个用户针对每个数据库进行特定的授权,以防止业务数据被不恰当的用户读取或修改。对某个用户进行数据库访问授权的语法如下:
```sql
GRANT privileges ON priv_level TO user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
```
对数据库的访问权限包含读和写两种权限,它们可以被分别授予,也可以被同时授予。
说明
- priv_level 格式中 "." 之前为数据库名称, "." 之后为表名称,意思为表级别的授权控制。如果 "." 之后为 "\*" ,意为 "." 前所指定的数据库中的所有表
- "dbname.\*" 意思是名为 "dbname" 的数据库中的所有表
- "\*.\*" 意思是所有数据库名中的所有表
### 数据库权限说明
对 root 用户和普通用户的权限的说明如下表
| 用户 | 描述 | 权限说明 |
| -------- | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 超级用户 | 只有 root 是超级用户 | DB 外部 所有操作权限例如user、dnode、udf、qnode等的CRUD DB 权限,包括 创建 删除 更新,例如修改 Option移动 Vgruop等 读 写 Enable/Disable 用户 |
| 普通用户 | 除 root 以外的其它用户均为普通用户 | 在可读的 DB 中,普通用户可以进行读操作 select describe show subscribe 在可写 DB 的内部,用户可以进行写操作: 创建、删除、修改 超级表 创建、删除、修改 子表 创建、删除、修改 topic 写入数据 被限制系统信息时,不可进行如下操作 show dnode、mnode、vgroups、qnode、snode 修改用户包括自身密码 show db时只能看到自己的db并且不能看到vgroups、副本、cache等信息 无论是否被限制系统信息,都可以 管理 udf 可以创建 DB 自己创建的 DB 具备所有权限 非自己创建的 DB ,参照读、写列表中的权限 |
## 消息订阅授权
任意用户都可以在自己拥有读权限的数据库上创建 topic。超级用户 root 可以在任意数据库上创建 topic。每个 topic 的订阅权限都可以被独立授权给任何用户,不管该用户是否拥有该数据库的访问权限。删除 topic 只能由 root 用户或者该 topic 的创建者进行。topic 只能由超级用户、topic的创建者或者被显式授予 subscribe 权限的用户订阅。
具体的 SQL 语法如下:
```sql
GRANT SUBSCRIBE ON topic_name TO user_name
REVOKE SUBSCRIBE ON topic_name FROM user_name
```
## 基于标签的授权(表级授权)
从 TDengine 3.0.5.0 开始,我们支持按标签授权某个超级表中部分特定的子表。具体的 SQL 语法如下。
```sql
GRANT privileges ON priv_level [WITH tag_condition] TO user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
REVOKE privileges ON priv_level [WITH tag_condition] FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
```
上面 SQL 的语义为:
- 用户可以通过 dbname.tbname 来为指定的表(包括超级表和普通表)授予或回收其读写权限,不支持直接对子表授予或回收权限。
- 用户可以通过 dbname.tbname 和 WITH 子句来为符合条件的所有子表授予或回收其读写权限。使用 WITH 子句时,权限级别必须为超级表。
## 表级权限和数据库权限的关系
下表列出了在不同的数据库授权和表级授权的组合下产生的实际权限。
| | **表无授权** | **表读授权** | **表读授权有标签条件** | **表写授权** | **表写授权有标签条件** |
| ---------------- | ---------------- | ---------------------------------------- | ------------------------------------------------------------ | ---------------------------------------- | ---------------------------------------------------------- |
| **数据库无授权** | 无授权 | 对此表有读权限,对数据库下的其他表无权限 | 对此表符合标签权限的子表有读权限,对数据库下的其他表无权限 | 对此表有写权限,对数据库下的其他表无权限 | 对此表符合标签权限的子表有写权限,对数据库下的其他表无权限 |
| **数据库读授权** | 对所有表有读权限 | 对所有表有读权限 | 对此表符合标签权限的子表有读权限,对数据库下的其他表有读权限 | 对此表有写权限,对所有表有读权限 | 对此表符合标签权限的子表有写权限,所有表有读权限 |
| **数据库写授权** | 对所有表有写权限 | 对此表有读权限,对所有表有写权限 | 对此表符合标签权限的子表有读权限,对所有表有写权限 | 对所有表有写权限 | 对此表符合标签权限的子表有写权限,数据库下的其他表有写权限 |
## 查看用户授权
使用下面的命令可以显示一个用户所拥有的授权:
```sql
show user privileges
```
## 撤销授权
1. 撤销数据库访问的授权
```sql
REVOKE privileges ON priv_level FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
READ
| WRITE
}
priv_level : {
dbname.tbname
| dbname.*
| *.*
}
```
2. 撤销数据订阅的授权
```sql
REVOKE privileges ON priv_level FROM user_name
privileges : {
ALL
| priv_type [, priv_type] ...
}
priv_type : {
SUBSCRIBE
}
priv_level : {
topic_name
}
```

View File

@ -27,22 +27,21 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送
## 安装 Kafka
在任意目录下执行:
- 在任意目录下执行:
```shell
curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
```
```shell
curl -O https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar xzf kafka_2.13-3.4.0.tgz -C /opt/
ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
```
然后需要把 `$KAFKA_HOME/bin` 目录加入 PATH。
- 然后需要把 `$KAFKA_HOME/bin` 目录加入 PATH。
```title=".profile"
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
```
以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile
```title=".profile"
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
```
以上脚本可以追加到当前用户的 profile 文件(~/.profile 或 ~/.bash_profile
## 安装 TDengine Connector 插件
@ -325,6 +324,21 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
```
### 性能调优
如果在从 TDengine 同步数据到 Kafka 的过程中发现性能不达预期,可以尝试使用如下参数提升 Kafka 的写入吞吐量。
1. 打开 KAFKA_HOME/config/producer.properties 配置文件。
2. 参数说明及配置建议如下:
| **参数** | **参数说明** | **设置建议** |
| --------| --------------------------------- | -------------- |
| producer.type | 此参数用于设置消息的发送方式,默认值为 `sync` 表示同步发送,`async` 表示异步发送。采用异步发送能够提升消息发送的吞吐量。 | async |
| request.required.acks | 参数用于配置生产者发送消息后需要等待的确认数量。当设置为1时表示只要领导者副本成功写入消息就会给生产者发送确认而无需等待集群中的其他副本写入成功。这种设置可以在一定程度上保证消息的可靠性同时也能保证一定的吞吐量。因为不需要等待所有副本都写入成功所以可以减少生产者的等待时间提高发送消息的效率。|1|
| max.request.size| 该参数决定了生产者在一次请求中可以发送的最大数据量。其默认值为 1048576也就是 1M。如果设置得太小可能会导致频繁的网络请求降低吞吐量。如果设置得太大可能会导致内存占用过高或者在网络状况不佳时增加请求失败的概率。建议设置为 100M。|104857600|
|batch.size| 此参数用于设定 batch 的大小,默认值为 16384即 16KB。在消息发送过程中发送到 Kafka 缓冲区中的消息会被划分成一个个的 batch。故而减小 batch 大小有助于降低消息延迟,而增大 batch 大小则有利于提升吞吐量,可根据实际的数据量大小进行合理配置。可根据实际情况进行调整,建议设置为 512K。|524288|
| buffer.memory| 此参数用于设置生产者缓冲待发送消息的内存总量。较大的缓冲区可以允许生产者积累更多的消息后批量发送,提高吞吐量,但也会增加延迟和内存使用。可根据机器资源来配置,建议配置为 1G。|1073741824|
## 配置参考
### 通用配置

View File

@ -213,7 +213,6 @@ static FORCE_INLINE int32_t taosEncodeVariantU16(void **buf, uint16_t value) {
if (buf != NULL) ((uint8_t *)(*buf))[i] = (uint8_t)(value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 3);
}
if (buf != NULL) {
@ -261,7 +260,6 @@ static FORCE_INLINE int32_t taosEncodeVariantU32(void **buf, uint32_t value) {
if (buf != NULL) ((uint8_t *)(*buf))[i] = (value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 5);
}
if (buf != NULL) {
@ -309,7 +307,6 @@ static FORCE_INLINE int32_t taosEncodeVariantU64(void **buf, uint64_t value) {
if (buf != NULL) ((uint8_t *)(*buf))[i] = (uint8_t)(value | ENCODE_LIMIT);
value >>= 7;
i++;
ASSERT(i < 10);
}
if (buf != NULL) {

View File

@ -1135,8 +1135,6 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
}
}
schedulerFreeJob(&pRequest->body.queryJob, 0);
}
taosMemoryFree(pResult);

View File

@ -12,7 +12,7 @@ extern char tsS3AccessKeySecret[][TSDB_FQDN_LEN];
extern char tsS3BucketName[TSDB_FQDN_LEN];
extern char tsS3AppId[][TSDB_FQDN_LEN];
extern char tsS3Hostname[][TSDB_FQDN_LEN];
extern int8_t tsS3Https;
extern int8_t tsS3Https[];
static int32_t s3ListBucketByEp(char const *bucketname, int8_t epIndex);
static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *object_name, int64_t offset, int64_t size,
@ -33,13 +33,13 @@ static int verifyPeerG = 0;
static const char *awsRegionG = NULL;
static int forceG = 0;
static int showResponsePropertiesG = 0;
static S3Protocol protocolG = S3ProtocolHTTPS;
static S3Protocol protocolG[TSDB_MAX_EP_NUM] = {S3ProtocolHTTPS};
// static S3Protocol protocolG = S3ProtocolHTTP;
static S3UriStyle uriStyleG = S3UriStylePath;
static S3UriStyle uriStyleG[TSDB_MAX_EP_NUM] = {S3UriStylePath};
static int retriesG = 5;
static int timeoutMsG = 0;
extern int8_t tsS3Oss;
extern int8_t tsS3Oss[];
int32_t s3Begin() {
S3Status status;
@ -55,9 +55,11 @@ int32_t s3Begin() {
TAOS_RETURN(TSDB_CODE_FAILED);
}
protocolG = !tsS3Https;
if (tsS3Oss) {
uriStyleG = S3UriStyleVirtualHost;
for (int i = 0; i < tsS3EpNum; i++) {
protocolG[i] = !tsS3Https[i];
if (tsS3Oss[i]) {
uriStyleG[i] = S3UriStyleVirtualHost;
}
}
TAOS_RETURN(TSDB_CODE_SUCCESS);
@ -976,8 +978,8 @@ int32_t s3PutObjectFromFile2ByEp(const char *file, const char *object_name, int8
S3BucketContext bucketContext = {tsS3Hostname[epIndex],
tsS3BucketName,
protocolG,
uriStyleG,
protocolG[epIndex],
uriStyleG[epIndex],
tsS3AccessKeyId[epIndex],
tsS3AccessKeySecret[epIndex],
0,
@ -1059,8 +1061,8 @@ static int32_t s3PutObjectFromFileOffsetByEp(const char *file, const char *objec
S3BucketContext bucketContext = {tsS3Hostname[epIndex],
tsS3BucketName,
protocolG,
uriStyleG,
protocolG[epIndex],
uriStyleG[epIndex],
tsS3AccessKeyId[epIndex],
tsS3AccessKeySecret[epIndex],
0,
@ -1155,8 +1157,8 @@ static void s3FreeObjectKey(void *pItem) {
static SArray *getListByPrefixByEp(const char *prefix, int8_t epIndex) {
S3BucketContext bucketContext = {tsS3Hostname[epIndex],
tsS3BucketName,
protocolG,
uriStyleG,
protocolG[epIndex],
uriStyleG[epIndex],
tsS3AccessKeyId[epIndex],
tsS3AccessKeySecret[epIndex],
0,
@ -1223,8 +1225,8 @@ static int32_t s3DeleteObjectsByEp(const char *object_name[], int nobject, int8_
S3BucketContext bucketContext = {tsS3Hostname[epIndex],
tsS3BucketName,
protocolG,
uriStyleG,
protocolG[epIndex],
uriStyleG[epIndex],
tsS3AccessKeyId[epIndex],
tsS3AccessKeySecret[epIndex],
0,
@ -1299,8 +1301,8 @@ static int32_t s3GetObjectBlockByEp(const char *object_name, int64_t offset, int
S3BucketContext bucketContext = {tsS3Hostname[epIndex],
tsS3BucketName,
protocolG,
uriStyleG,
protocolG[epIndex],
uriStyleG[epIndex],
tsS3AccessKeyId[epIndex],
tsS3AccessKeySecret[epIndex],
0,
@ -1372,8 +1374,8 @@ static int32_t s3GetObjectToFileByEp(const char *object_name, const char *fileNa
S3BucketContext bucketContext = {tsS3Hostname[epIndex],
tsS3BucketName,
protocolG,
uriStyleG,
protocolG[epIndex],
uriStyleG[epIndex],
tsS3AccessKeyId[epIndex],
tsS3AccessKeySecret[epIndex],
0,
@ -1449,8 +1451,8 @@ static long s3SizeByEp(const char *object_name, int8_t epIndex) {
S3BucketContext bucketContext = {tsS3Hostname[epIndex],
tsS3BucketName,
protocolG,
uriStyleG,
protocolG[epIndex],
uriStyleG[epIndex],
tsS3AccessKeyId[epIndex],
tsS3AccessKeySecret[epIndex],
0,

View File

@ -303,10 +303,10 @@ char tsS3BucketName[TSDB_FQDN_LEN] = "<bucketname>";
char tsS3AppId[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<appid>"};
int8_t tsS3Enabled = false;
int8_t tsS3EnabledCfg = false;
int8_t tsS3Oss = false;
int8_t tsS3Oss[TSDB_MAX_EP_NUM] = {false};
int8_t tsS3StreamEnabled = false;
int8_t tsS3Https = true;
int8_t tsS3Https[TSDB_MAX_EP_NUM] = {true};
char tsS3Hostname[TSDB_MAX_EP_NUM][TSDB_FQDN_LEN] = {"<hostname>"};
int32_t tsS3BlockSize = -1; // number of tsdb pages (4096)
@ -431,11 +431,10 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
tstrncpy(tsS3AppId[i], appid + 1, TSDB_FQDN_LEN);
}
}
tsS3Https[i] = (strstr(tsS3Endpoint[i], "https://") != NULL);
tsS3Oss[i] = (strstr(tsS3Endpoint[i], "aliyuncs.") != NULL);
}
tsS3Https = (strstr(tsS3Endpoint[0], "https://") != NULL);
tsS3Oss = (strstr(tsS3Endpoint[0], "aliyuncs.") != NULL);
if (tsS3BucketName[0] != '<') {
#if defined(USE_COS) || defined(USE_S3)
#ifdef TD_ENTERPRISE

View File

@ -898,7 +898,7 @@ typedef struct SSttDataInfoForTable {
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
int32_t tMergeTreeNext(SMergeTree *pMTree, bool* pHasNext);
void tMergeTreePinSttBlock(SMergeTree *pMTree);
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);

View File

@ -1204,11 +1204,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
}
if (req.mndTrigger) {
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr,
tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr,
vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
} else {
const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask));
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
tqInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64
", transId:%d after transfer-state, prev status:%s",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus);
}

View File

@ -2245,17 +2245,20 @@ static int32_t lastIterClose(SFSLastIter **iter) {
}
static int32_t lastIterNext(SFSLastIter *iter, TSDBROW **ppRow) {
int32_t code = 0;
bool hasVal = false;
*ppRow = NULL;
int32_t code = tMergeTreeNext(iter->pMergeTree, &hasVal);
if (code != 0) {
return code;
}
bool hasVal = tMergeTreeNext(iter->pMergeTree);
if (!hasVal) {
*ppRow = NULL;
TAOS_RETURN(code);
}
*ppRow = tMergeTreeGetRow(iter->pMergeTree);
TAOS_RETURN(code);
}

View File

@ -115,16 +115,14 @@ void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoad
SArray *pList = taosArrayGetP(pLDataIterArray, i);
for (int32_t j = 0; j < taosArrayGetSize(pList); ++j) {
SLDataIter *pIter = taosArrayGetP(pList, j);
if (pIter->pBlockLoadInfo == NULL) {
continue;
}
SSttBlockLoadCostInfo *pCost = &pIter->pBlockLoadInfo->cost;
if (pLoadCost != NULL) {
pLoadCost->loadBlocks += pCost->loadBlocks;
pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks;
pLoadCost->blockElapsedTime += pCost->blockElapsedTime;
pLoadCost->statisElapsedTime += pCost->statisElapsedTime;
if (pIter->pBlockLoadInfo != NULL) {
SSttBlockLoadCostInfo *pCost = &pIter->pBlockLoadInfo->cost;
if (pLoadCost != NULL) {
pLoadCost->loadBlocks += pCost->loadBlocks;
pLoadCost->loadStatisBlocks += pCost->loadStatisBlocks;
pLoadCost->blockElapsedTime += pCost->blockElapsedTime;
pLoadCost->statisElapsedTime += pCost->statisElapsedTime;
}
}
destroyLDataIter(pIter);
@ -903,6 +901,7 @@ int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) {
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit:
tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr);
*hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
return code;
}
@ -1102,13 +1101,22 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree) {
tLDataIterUnpinSttBlock(pIter, pMTree->idStr);
}
bool tMergeTreeNext(SMergeTree *pMTree) {
int32_t tMergeTreeNext(SMergeTree *pMTree, bool *pHasNext) {
int32_t code = 0;
if (pHasNext == NULL) {
return TSDB_CODE_INVALID_PARA;
}
*pHasNext = false;
if (pMTree->pIter) {
SLDataIter *pIter = pMTree->pIter;
bool hasVal = false;
int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
bool hasVal = false;
code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
if (!hasVal || (code != 0)) {
if (code == TSDB_CODE_FILE_CORRUPTED) {
code = 0; // suppress the file corrupt error to enable all queries within this cluster can run without failed.
}
pMTree->pIter = NULL;
}
@ -1117,7 +1125,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
if (pMTree->pIter && pIter) {
int32_t c = pMTree->rbt.cmprFn(&pMTree->pIter->node, &pIter->node);
if (c > 0) {
(void) tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
(void)tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pMTree->pIter);
pMTree->pIter = NULL;
} else {
ASSERT(c);
@ -1132,7 +1140,8 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
}
}
return pMTree->pIter != NULL;
*pHasNext = (pMTree->pIter != NULL);
return code;
}
void tMergeTreeClose(SMergeTree *pMTree) {

View File

@ -1383,7 +1383,6 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro
static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) {
ASSERT(pReader->info.pSchema == NULL);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) {
terrno = code;
@ -1414,7 +1413,9 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
if (pReader->info.pSchema == NULL) {
pSchema = getTableSchemaImpl(pReader, uid);
if (pSchema == NULL) {
tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
code = terrno;
tsdbError("%p table uid:%" PRIu64 " failed to get tableschema, code:%s, %s", pReader, uid, tstrerror(code),
pReader->idStr);
return code;
}
}
@ -1449,7 +1450,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
pReader->cost.blockLoadTime += elapsedTime;
pDumpInfo->allDumped = false;
return TSDB_CODE_SUCCESS;
return code;
}
/**
@ -1759,14 +1760,22 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return code;
}
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
SVersionRange* pVerRange) {
static int32_t nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int32_t pkSrcSlot,
SVersionRange* pVerRange) {
int32_t code = 0;
int32_t order = pSttBlockReader->order;
int32_t step = ASCENDING_TRAVERSE(order) ? 1 : -1;
SRowKey* pNextProc = &pScanInfo->sttKeyInfo.nextProcKey;
while (1) {
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
bool hasVal = false;
code = tMergeTreeNext(&pSttBlockReader->mergeTree, &hasVal);
if (code) {
tsdbError("failed to iter the next row in stt-file merge tree, code:%s, %s", tstrerror(code),
pSttBlockReader->mergeTree.idStr);
return code;
}
if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
@ -1779,7 +1788,7 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
memset(pNextProc->pks[0].pData, 0, pNextProc->pks[0].nData);
}
}
return false;
return code;
}
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
@ -1798,13 +1807,15 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, order, pVerRange,
pSttBlockReader->numOfPks > 0)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
return code;
}
} else {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true;
return code;
}
}
return code;
}
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); }
@ -1819,9 +1830,14 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttB
// avoid the fetch next row replace the referenced stt block in buffer
doPinSttBlock(pSttBlockReader);
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
doUnpinSttBlock(pSttBlockReader);
if (hasVal) {
if (code) {
return code;
}
if (hasDataInSttBlock(pScanInfo)) {
SRowKey* pNext = getCurrentKeyInSttBlock(pSttBlockReader);
if (pkCompEx(pSttKey, pNext) != 0) {
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
@ -2125,7 +2141,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (piRow->type == TSDBROW_ROW_FMT) {
piSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
if (piSchema == NULL) {
return code;
return terrno;
}
}
@ -2380,14 +2396,13 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, int32_t rowIndex, STable
return true;
}
static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool hasData = true;
static void initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
int32_t order = pReader->info.order;
bool asc = ASCENDING_TRAVERSE(order);
// the stt block reader has been initialized for this table.
if (pSttBlockReader->uid == pScanInfo->uid) {
return hasDataInSttBlock(pScanInfo);
return;
}
if (pSttBlockReader->uid != 0) {
@ -2396,9 +2411,14 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pSttBlockReader->uid = pScanInfo->uid;
// second time init stt block reader
// second or third time init stt block reader
if (pScanInfo->cleanSttBlocks && (pReader->info.execMode == READER_EXEC_ROWS)) {
return !pScanInfo->sttBlockReturned;
// only allowed to retrieve clean stt blocks for count once
if (pScanInfo->sttBlockReturned) {
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
tsdbDebug("uid:%" PRIu64 " set no stt-file data after stt-block retrieved, %s", pScanInfo->uid, pReader->idStr);
}
return;
}
STimeWindow w = pSttBlockReader->window;
@ -2435,28 +2455,28 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
SSttDataInfoForTable info = {.pKeyRangeList = taosArrayInit(4, sizeof(SSttKeyRange))};
if (info.pKeyRangeList == NULL) {
pReader->code = terrno;
return false;
return;
}
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(info.pKeyRangeList);
pReader->code = code;
return false;
return;
}
code = initMemDataIterator(pScanInfo, pReader);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(info.pKeyRangeList);
pReader->code = code;
return false;
return;
}
code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(info.pKeyRangeList);
pReader->code = code;
return false;
return;
}
if (conf.rspRows) {
@ -2484,27 +2504,26 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
SRowKey* p = asc ? &pScanInfo->sttRange.skey : &pScanInfo->sttRange.ekey;
tRowKeyAssign(&pScanInfo->sttKeyInfo.nextProcKey, p);
hasData = (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
} else { // not clean stt blocks
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
}
} else {
pScanInfo->cleanSttBlocks = false;
INIT_KEYRANGE(&pScanInfo->sttRange); // reset the time window
pScanInfo->sttBlockReturned = false;
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pReader->suppInfo.pkSrcSlot, &pReader->info.verRange);
}
pScanInfo->sttBlockReturned = false;
taosArrayDestroy(info.pKeyRangeList);
int64_t el = taosGetTimestampUs() - st;
pReader->cost.initSttBlockReader += (el / 1000.0);
tsdbDebug("init stt block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
return hasData;
if (code != 0) {
pReader->code = code;
}
}
static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; }
@ -2772,7 +2791,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
SBlockData* pBlockData = &pReader->status.fileBlockData;
(void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
if (pReader->code != 0) {
code = pReader->code;
goto _end;
@ -3190,12 +3209,12 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
continue;
}
bool hasDataInSttFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != TSDB_CODE_SUCCESS) {
return pReader->code;
}
if (!hasDataInSttFile) {
if (!hasDataInSttBlock(pScanInfo)) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
@ -3287,7 +3306,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
}
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != 0) {
return pReader->code;
}
@ -3331,7 +3350,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs();
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
(void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (pReader->code != 0) {
return pReader->code;
}
@ -4087,7 +4106,12 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI
SRowKey* pRowKey = &pScanInfo->lastProcKey;
int32_t code = TSDB_CODE_SUCCESS;
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) {
while (1) {
code = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange);
if (code || (!hasDataInSttBlock(pScanInfo))) {
return code;
}
SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader);
int32_t ret = pkCompEx(pRowKey, pNextKey);

View File

@ -61,7 +61,9 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
// // open each segment reader
int64_t offset = config->file->size - sizeof(SSttFooter);
ASSERT(offset >= TSDB_FHDR_SIZE);
if (offset < TSDB_FHDR_SIZE) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
}
int32_t encryptAlgoirthm = config->tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
char *encryptKey = config->tsdb->pVnode->config.tsdbCfg.encryptKey;
@ -115,7 +117,9 @@ int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray **statisBlkArray) {
if (!reader->ctx->statisBlkLoaded) {
if (reader->footer->statisBlkPtr->size > 0) {
ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
if (reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) != 0) {
return TSDB_CODE_FILE_CORRUPTED;
}
int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk);
void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
@ -147,7 +151,9 @@ int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray *
int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tombBlkArray) {
if (!reader->ctx->tombBlkLoaded) {
if (reader->footer->tombBlkPtr->size > 0) {
ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
if (reader->footer->tombBlkPtr->size % sizeof(STombBlk) != 0) {
return TSDB_CODE_FILE_CORRUPTED;
}
int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
void *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size);
@ -179,7 +185,9 @@ int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tom
int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBlkArray) {
if (!reader->ctx->sttBlkLoaded) {
if (reader->footer->sttBlkPtr->size > 0) {
ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
if (reader->footer->sttBlkPtr->size % sizeof(SSttBlk) != 0) {
return TSDB_CODE_FILE_CORRUPTED;
}
int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
@ -256,7 +264,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
TAOS_CHECK_GOTO(tGetDiskDataHdr(&br, &hdr), &lino, _exit);
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
if (hdr.delimiter != TSDB_FILE_DLMT) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
}
// set data container
tBlockDataReset(bData);
@ -266,7 +276,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
// key part
TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, &br, bData, assist), &lino, _exit);
ASSERT(br.offset == buffer0->size);
if (br.offset != buffer0->size) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
}
bool loadExtra = false;
for (int i = 0; i < ncid; i++) {
@ -376,7 +388,10 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk
br.offset += tombBlk->size[i];
}
ASSERT(br.offset == tombBlk->dp->size);
if (br.offset != tombBlk->dp->size) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
}
_exit:
if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(reader->config->tsdb->pVnode), __func__, __FILE__, lino,
@ -444,7 +459,9 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
}
}
ASSERT(br.offset == buffer0->size);
if (br.offset != buffer0->size) {
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
}
_exit:
if (code) {
@ -814,8 +831,6 @@ _exit:
}
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
ASSERT(writer->fd == NULL);
for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
tBufferDestroy(writer->local + i);
}
@ -854,7 +869,6 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
tsdbCloseFile(&writer->fd);
ASSERT(writer->file->size > 0);
STFileOp op = (STFileOp){
.optype = TSDB_FOP_CREATE,
.fid = writer->config->fid,

View File

@ -171,7 +171,7 @@ static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) {
TAOS_CHECK_RETURN(tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count,
sizeof(record.count)));
} else {
ASSERT(0);
return TSDB_CODE_INVALID_PARA;
}
return 0;

View File

@ -165,9 +165,7 @@ static int32_t vnodeAsyncTaskDone(SVAsync *async, SVATask *task) {
}
ret = vHashDrop(async->taskTable, task);
if (ret != 0) {
ASSERT(0);
}
TAOS_UNUSED(ret);
async->numTasks--;
if (task->numWait == 0) {
@ -403,7 +401,6 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
}
(void)taosThreadJoin((*async)->workers[i].thread, NULL);
ASSERT((*async)->workers[i].state == EVA_WORKER_STATE_STOP);
(*async)->workers[i].state = EVA_WORKER_STATE_UINIT;
}
@ -413,18 +410,11 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
channel->prev->next = channel->next;
int32_t ret = vHashDrop((*async)->channelTable, channel);
if (ret) {
ASSERT(0);
}
TAOS_UNUSED(ret);
(*async)->numChannels--;
taosMemoryFree(channel);
}
ASSERT((*async)->numLaunchWorkers == 0);
ASSERT((*async)->numIdleWorkers == 0);
ASSERT((*async)->numChannels == 0);
ASSERT((*async)->numTasks == 0);
(void)taosThreadMutexDestroy(&(*async)->mutex);
(void)taosThreadCondDestroy(&(*async)->hasTask);
@ -438,7 +428,6 @@ static int32_t vnodeAsyncDestroy(SVAsync **async) {
static int32_t vnodeAsyncLaunchWorker(SVAsync *async) {
for (int32_t i = 0; i < async->numWorkers; i++) {
ASSERT(async->workers[i].state != EVA_WORKER_STATE_IDLE);
if (async->workers[i].state == EVA_WORKER_STATE_ACTIVE) {
continue;
} else if (async->workers[i].state == EVA_WORKER_STATE_STOP) {

View File

@ -375,11 +375,11 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
}
tjsonGetNumberValue(pJson, "s3ChunkSize", pCfg->s3ChunkSize, code);
if (code < 0) {
if (code < 0 || pCfg->s3ChunkSize < TSDB_MIN_S3_CHUNK_SIZE) {
pCfg->s3ChunkSize = TSDB_DEFAULT_S3_CHUNK_SIZE;
}
tjsonGetNumberValue(pJson, "s3KeepLocal", pCfg->s3KeepLocal, code);
if (code < 0) {
if (code < 0 || pCfg->s3KeepLocal < TSDB_MIN_S3_KEEP_LOCAL) {
pCfg->s3KeepLocal = TSDB_DEFAULT_S3_KEEP_LOCAL;
}
tjsonGetNumberValue(pJson, "s3Compact", pCfg->s3Compact, code);

View File

@ -302,7 +302,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit);
(void)taosThreadMutexLock(&pVnode->mutex);
ASSERT(pVnode->onCommit == NULL);
pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL;
(void)taosThreadMutexUnlock(&pVnode->mutex);
@ -339,7 +338,7 @@ static void vnodeReturnBufPool(SVnode *pVnode) {
pVnode->recycleTail = pPool;
}
} else {
ASSERT(0);
vError("vgId:%d, buffer pool %p of id %d nRef:%d", TD_VID(pVnode), pPool, pPool->id, nRef);
}
(void)taosThreadMutexUnlock(&pVnode->mutex);

View File

@ -77,7 +77,6 @@ int32_t vHashDestroy(SVHashTable** ht) {
}
if (*ht) {
ASSERT((*ht)->numEntries == 0);
taosMemoryFree((*ht)->buckets);
taosMemoryFree(*ht);
(*ht) = NULL;

View File

@ -558,7 +558,9 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) {
ASSERT(pVnode);
if (pVnode == NULL) {
return TSDB_CODE_INVALID_PARA;
}
return vnodeSyncStart(pVnode);
}

View File

@ -225,7 +225,10 @@ static SSDataBlock* doLoadRemoteDataImpl(SOperatorInfo* pOperator) {
} else {
concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
if (TSDB_CODE_SUCCESS != pOperator->pTaskInfo->code) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, pOperator->pTaskInfo->code);
}
if (taosArrayGetSize(pExchangeInfo->pResultBlockList) == 0) {
return NULL;
} else {

View File

@ -663,7 +663,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
if (isTaskKilled(pTaskInfo)) {
atomic_store_64(&pTaskInfo->owner, 0);
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
return TSDB_CODE_SUCCESS;
return pTaskInfo->code;
}
// error occurs, record the error code and return to client
@ -785,7 +785,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
return pTaskInfo->code;
}
if (pTaskInfo->owner != 0) {

View File

@ -1746,6 +1746,9 @@ void destroyGrpArray(void* ppArray) {
}
void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
if (NULL == pTable) {
return;
}
mJoinDestroyCreatedBlks(pTable->createdBlks);
taosArrayDestroy(pTable->createdBlks);
tSimpleHashCleanup(pTable->pGrpHash);

View File

@ -464,15 +464,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
SSDataBlock* p = NULL;
code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL);
if (p == NULL || code != TSDB_CODE_SUCCESS) {
if (p == NULL || code != TSDB_CODE_SUCCESS || p != pBlock) {
return code;
}
if(p != pBlock) {
qError("[loadDataBlock] p != pBlock");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
code = doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
if (code) {
return code;

View File

@ -2372,6 +2372,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
}
gRes[gResIdx]->colInfo = taosMemoryCalloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(SFilterColInfo));
if (gRes[gResIdx]->colInfo == NULL) {
filterFreeGroupCtx(gRes[gResIdx]);
FLT_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
colIdxi = 0;
@ -2384,6 +2385,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
if (gRes[gResIdx]->colInfo[cidx].info == NULL) {
gRes[gResIdx]->colInfo[cidx].info = (SArray *)taosArrayInit(4, POINTER_BYTES);
if (gRes[gResIdx]->colInfo[cidx].info == NULL) {
filterFreeGroupCtx(gRes[gResIdx]);
FLT_ERR_JRET(terrno);
}
colIdx[colIdxi++] = cidx;
@ -2408,7 +2410,11 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
continue;
}
FLT_ERR_JRET(filterMergeUnits(info, gRes[gResIdx], colIdx[l], &empty));
code = filterMergeUnits(info, gRes[gResIdx], colIdx[l], &empty);
if (TSDB_CODE_SUCCESS != code) {
filterFreeGroupCtx(gRes[gResIdx]);
SCL_ERR_JRET(code);
}
if (empty) {
break;
@ -2426,10 +2432,9 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t
gRes[gResIdx]->colNum = colIdxi;
FILTER_COPY_IDX(&gRes[gResIdx]->colIdx, colIdx, colIdxi);
++gResIdx;
*gResNum = gResIdx;
}
*gResNum = gResIdx;
if (gResIdx == 0) {
FILTER_SET_FLAG(info->status, FI_STATUS_EMPTY);
}

View File

@ -1260,7 +1260,9 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
pMeta->stage = stage;
// mark the sign to send msg before close all tasks
if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) {
// 1. for leader vnode, always send msg before closing
// 2. for follower vnode, if it's is changed from leader, also sending msg before closing.
if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->sendMsgBeforeClosing = true;
}