Merge branch '3.0' into enh/triggerCheckPoint2

This commit is contained in:
liuyao 2023-07-21 10:23:41 +08:00 committed by GitHub
commit d45563684b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1087 additions and 425 deletions

View File

@ -42,10 +42,20 @@ In TDengine, the data types below can be used when specifying a column or tag.
| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. | | 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. | | 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
| 16 | VARCHAR | User-defined | Alias of BINARY | | 16 | VARCHAR | User-defined | Alias of BINARY |
| 16 | GEOMETRY | User-defined | Geometry |
:::note :::note
- Each row of the table cannot be longer than 48KB (64KB since version 3.0.5.0) (note that each BINARY/NCHAR/GEOMETRY column takes up an additional 2 bytes of storage space).
- Only ASCII visible characters are suggested to be used in a column or tag of BINARY type. Multi-byte characters must be stored in NCHAR type. - Only ASCII visible characters are suggested to be used in a column or tag of BINARY type. Multi-byte characters must be stored in NCHAR type.
- The length of BINARY can be up to 16,374(data column is 65,517 and tag column is 16,382 since version 3.0.5.0) bytes. The string value must be quoted with single quotes. You must specify a length in bytes for a BINARY value, for example binary(20) for up to twenty single-byte characters. If the data exceeds the specified length, an error will occur. The literal single quote inside the string must be preceded with back slash like `\'` - The length of BINARY can be up to 16,374(data column is 65,517 and tag column is 16,382 since version 3.0.5.0) bytes. The string value must be quoted with single quotes. You must specify a length in bytes for a BINARY value, for example binary(20) for up to twenty single-byte characters. If the data exceeds the specified length, an error will occur. The literal single quote inside the string must be preceded with back slash like `\'`
- The maximum length of the GEOMETRY data column is 65,517 bytes, and the maximum length of the tag column is 16,382 bytes. Supports POINT, LINESTRING, and POLYGON subtypes of 2D. The following table describes the length calculation method:
| # | **Syntax** | **MinLen** | **MaxLen** | **Growth of each point** |
|---|--------------------------------------|------------|------------|--------------------------|
| 1 | POINT(1.0 1.0) | 21 | 21 | NA |
| 2 | LINESTRING(1.0 1.0, 2.0 2.0) | 9+2*16 | 9+4094*16 | +16 |
| 3 | POLYGON((1.0 1.0, 2.0 2.0, 1.0 1.0)) | 13+3*16 | 13+4094*16 | +16 |
- Numeric values in SQL statements will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used, so attention must be paid to avoid overflow. For example, 9999999999999999999 will be considered as overflow because it exceeds the upper limit of long integer, but 9999999999999999999.0 will be considered as a legal float number. - Numeric values in SQL statements will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used, so attention must be paid to avoid overflow. For example, 9999999999999999999 will be considered as overflow because it exceeds the upper limit of long integer, but 9999999999999999999.0 will be considered as a legal float number.
::: :::

View File

@ -45,9 +45,9 @@ table_option: {
1. The first column of a table MUST be of type TIMESTAMP. It is automatically set as the primary key. 1. The first column of a table MUST be of type TIMESTAMP. It is automatically set as the primary key.
2. The maximum length of the table name is 192 bytes. 2. The maximum length of the table name is 192 bytes.
3. The maximum length of each row is 48k(64k since version 3.0.5.0) bytes, please note that the extra 2 bytes used by each BINARY/NCHAR column are also counted. 3. The maximum length of each row is 48k(64k since version 3.0.5.0) bytes, please note that the extra 2 bytes used by each BINARY/NCHAR/GEOMETRY column are also counted.
4. The name of the subtable can only consist of characters from the English alphabet, digits and underscore. Table names can't start with a digit. Table names are case insensitive. 4. The name of the subtable can only consist of characters from the English alphabet, digits and underscore. Table names can't start with a digit. Table names are case insensitive.
5. The maximum length in bytes must be specified when using BINARY or NCHAR types. 5. The maximum length in bytes must be specified when using BINARY/NCHAR/GEOMETRY types.
6. Escape character "\`" can be used to avoid the conflict between table names and reserved keywords, above rules will be bypassed when using escape character on table names, but the upper limit for the name length is still valid. The table names specified using escape character are case sensitive. 6. Escape character "\`" can be used to avoid the conflict between table names and reserved keywords, above rules will be bypassed when using escape character on table names, but the upper limit for the name length is still valid. The table names specified using escape character are case sensitive.
For example \`aBc\` and \`abc\` are different table names but `abc` and `aBc` are same table names because they are both converted to `abc` internally. For example \`aBc\` and \`abc\` are different table names but `abc` and `aBc` are same table names because they are both converted to `abc` internally.
Only ASCII visible characters can be used with escape character. Only ASCII visible characters can be used with escape character.

View File

@ -39,7 +39,7 @@ TDengine supports the `UNION` and `UNION ALL` operations. UNION ALL collects all
| 3 | \>, < | All types except BLOB, MEDIUMBLOB, and JSON | Greater than and less than | | 3 | \>, < | All types except BLOB, MEDIUMBLOB, and JSON | Greater than and less than |
| 4 | \>=, <= | All types except BLOB, MEDIUMBLOB, and JSON | Greater than or equal to and less than or equal to | | 4 | \>=, <= | All types except BLOB, MEDIUMBLOB, and JSON | Greater than or equal to and less than or equal to |
| 5 | IS [NOT] NULL | All types | Indicates whether the value is null | | 5 | IS [NOT] NULL | All types | Indicates whether the value is null |
| 6 | [NOT] BETWEEN AND | All types except BLOB, MEDIUMBLOB, and JSON | Closed interval comparison | | 6 | [NOT] BETWEEN AND | All types except BLOB, MEDIUMBLOB, JSON and GEOMETRY | Closed interval comparison |
| 7 | IN | All types except BLOB, MEDIUMBLOB, and JSON; the primary key (timestamp) is also not supported | Equal to any value in the list | | 7 | IN | All types except BLOB, MEDIUMBLOB, and JSON; the primary key (timestamp) is also not supported | Equal to any value in the list |
| 8 | LIKE | BINARY, NCHAR, and VARCHAR | Wildcard match | | 8 | LIKE | BINARY, NCHAR, and VARCHAR | Wildcard match |
| 9 | MATCH, NMATCH | BINARY, NCHAR, and VARCHAR | Regular expression match | | 9 | MATCH, NMATCH | BINARY, NCHAR, and VARCHAR | Regular expression match |

View File

@ -18,6 +18,7 @@ description: This document describes how TDengine SQL has changed in version 3.0
| 8 | Mixed operations | Enhanced | Mixing scalar and vector operations in queries has been enhanced and is supported in all SELECT clauses. | 8 | Mixed operations | Enhanced | Mixing scalar and vector operations in queries has been enhanced and is supported in all SELECT clauses.
| 9 | Tag operations | Added | Tag columns can be used in queries and clauses like data columns. | 9 | Tag operations | Added | Tag columns can be used in queries and clauses like data columns.
| 10 | Timeline clauses and time functions in supertables | Enhanced | When PARTITION BY is not used, data in supertables is merged into a single timeline. | 10 | Timeline clauses and time functions in supertables | Enhanced | When PARTITION BY is not used, data in supertables is merged into a single timeline.
| 11 | GEOMETRY | Added | Geometry
## SQL Syntax ## SQL Syntax

View File

@ -42,12 +42,21 @@ CREATE DATABASE db_name PRECISION 'ns';
| 14 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 NCHAR 字符占用 4 字节的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\'`。NCHAR 使用时须指定字符串大小,类型为 NCHAR(10) 的列表示此列的字符串最多存储 10 个 NCHAR 字符。如果用户字符串长度超出声明长度,将会报错。 | | 14 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 NCHAR 字符占用 4 字节的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\'`。NCHAR 使用时须指定字符串大小,类型为 NCHAR(10) 的列表示此列的字符串最多存储 10 个 NCHAR 字符。如果用户字符串长度超出声明长度,将会报错。 |
| 15 | JSON | | JSON 数据类型, 只有 Tag 可以是 JSON 格式 | | 15 | JSON | | JSON 数据类型, 只有 Tag 可以是 JSON 格式 |
| 16 | VARCHAR | 自定义 | BINARY 类型的别名 | | 16 | VARCHAR | 自定义 | BINARY 类型的别名 |
| 17 | GEOMETRY | 自定义 | 几何类型 |
:::note :::note
- 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB注意每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)。 - 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB注意每个 BINARY/NCHAR/GEOMETRY 类型的列还会额外占用 2 个字节的存储位置)。
- 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。 - 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。
- BINARY 类型理论上最长可以有 16,374从 3.0.5.0 版本开始,数据列为 65,517标签列为 16,382 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'` - BINARY 类型理论上最长可以有 16,374从 3.0.5.0 版本开始,数据列为 65,517标签列为 16,382 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'`
- GEOMETRY 类型数据列为最大长度为 65,517 字节,标签列最大长度为 16,382 字节。支持 2D 的 POINT、LINESTRING 和 POLYGON 子类型数据。长度计算方式如下表所示:
| # | **语法** | **最小长度** | **最大长度** | **每组坐标长度增长** |
|---|--------------------------------------|----------|------------|--------------|
| 1 | POINT(1.0 1.0) | 21 | 21 | 无 |
| 2 | LINESTRING(1.0 1.0, 2.0 2.0) | 9+2*16 | 9+4094*16 | +16 |
| 3 | POLYGON((1.0 1.0, 2.0 2.0, 1.0 1.0)) | 13+3*16 | 13+4094*16 | +16 |
- SQL 语句中的数值类型将依据是否存在小数点或使用科学计数法表示来判断数值类型是否为整型或者浮点型因此在使用时要注意相应类型越界的情况。例如9999999999999999999 会认为超过长整型的上边界而溢出,而 9999999999999999999.0 会被认为是有效的浮点数。 - SQL 语句中的数值类型将依据是否存在小数点或使用科学计数法表示来判断数值类型是否为整型或者浮点型因此在使用时要注意相应类型越界的情况。例如9999999999999999999 会认为超过长整型的上边界而溢出,而 9999999999999999999.0 会被认为是有效的浮点数。
::: :::

View File

@ -43,9 +43,9 @@ table_option: {
1. 表的第一个字段必须是 TIMESTAMP并且系统自动将其设为主键 1. 表的第一个字段必须是 TIMESTAMP并且系统自动将其设为主键
2. 表名最大长度为 192 2. 表名最大长度为 192
3. 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB;(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置) 3. 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB;(注意:每个 BINARY/NCHAR/GEOMETRY 类型的列还会额外占用 2 个字节的存储位置)
4. 子表名只能由字母、数字和下划线组成,且不能以数字开头,不区分大小写 4. 子表名只能由字母、数字和下划线组成,且不能以数字开头,不区分大小写
5. 使用数据类型 binary 或 nchar需指定其最长的字节数如 binary(20),表示 20 字节; 5. 使用数据类型 BINARY/NCHAR/GEOMETRY需指定其最长的字节数如 BINARY(20),表示 20 字节;
6. 为了兼容支持更多形式的表名TDengine 引入新的转义符 "\`",可以让表名与关键词不冲突,同时不受限于上述表名称合法性约束检查。但是同样具有长度限制要求。使用转义字符以后,不再对转义字符中的内容进行大小写统一。 6. 为了兼容支持更多形式的表名TDengine 引入新的转义符 "\`",可以让表名与关键词不冲突,同时不受限于上述表名称合法性约束检查。但是同样具有长度限制要求。使用转义字符以后,不再对转义字符中的内容进行大小写统一。
例如:\`aBc\` 和 \`abc\` 是不同的表名,但是 abc 和 aBc 是相同的表名。 例如:\`aBc\` 和 \`abc\` 是不同的表名,但是 abc 和 aBc 是相同的表名。
需要注意的是转义字符中的内容必须是可打印字符。 需要注意的是转义字符中的内容必须是可打印字符。

View File

@ -39,7 +39,7 @@ TDengine 支持 `UNION ALL` 和 `UNION` 操作符。UNION ALL 将查询返回的
| 3 | \>, < | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于,小于 | | 3 | \>, < | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于,小于 |
| 4 | \>=, <= | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于等于,小于等于 | | 4 | \>=, <= | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于等于,小于等于 |
| 5 | IS [NOT] NULL | 所有类型 | 是否为空值 | | 5 | IS [NOT] NULL | 所有类型 | 是否为空值 |
| 6 | [NOT] BETWEEN AND | 除 BOOL、BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 闭区间比较 | | 6 | [NOT] BETWEEN AND | 除 BOOL、BLOB、MEDIUMBLOB、JSON 和 GEOMETRY 外的所有类型 | 闭区间比较 |
| 7 | IN | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 与列表内的任意值相等 | | 7 | IN | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 与列表内的任意值相等 |
| 8 | LIKE | BINARY、NCHAR 和 VARCHAR | 通配符匹配 | | 8 | LIKE | BINARY、NCHAR 和 VARCHAR | 通配符匹配 |
| 9 | MATCH, NMATCH | BINARY、NCHAR 和 VARCHAR | 正则表达式匹配 | | 9 | MATCH, NMATCH | BINARY、NCHAR 和 VARCHAR | 正则表达式匹配 |

View File

@ -18,6 +18,7 @@ description: "TDengine 3.0 版本的语法变更说明"
| 8 | 混合运算 | 增强 | 查询中的混合运算标量运算和矢量运算混合全面增强SELECT的各个子句均全面支持符合语法语义的混合运算。 | 8 | 混合运算 | 增强 | 查询中的混合运算标量运算和矢量运算混合全面增强SELECT的各个子句均全面支持符合语法语义的混合运算。
| 9 | 标签运算 | 新增 |在查询中,标签列可以像普通列一样参与各种运算,用于各种子句。 | 9 | 标签运算 | 新增 |在查询中,标签列可以像普通列一样参与各种运算,用于各种子句。
| 10 | 时间线子句和时间函数用于超级表查询 | 增强 |没有PARTITION BY时超级表的数据会被合并成一条时间线。 | 10 | 时间线子句和时间函数用于超级表查询 | 增强 |没有PARTITION BY时超级表的数据会被合并成一条时间线。
| 11 | GEOMETRY | 新增 | 几何类型。
## SQL 语句变更 ## SQL 语句变更

View File

@ -55,6 +55,7 @@ typedef struct SLogicNode {
EGroupAction groupAction; EGroupAction groupAction;
EOrder inputTsOrder; EOrder inputTsOrder;
EOrder outputTsOrder; EOrder outputTsOrder;
bool forceCreateNonBlockingOptr; // true if the operator can use non-blocking(pipeline) mode
} SLogicNode; } SLogicNode;
typedef enum EScanType { typedef enum EScanType {
@ -105,6 +106,7 @@ typedef struct SScanLogicNode {
bool hasNormalCols; // neither tag column nor primary key tag column bool hasNormalCols; // neither tag column nor primary key tag column
bool sortPrimaryKey; bool sortPrimaryKey;
bool igLastNull; bool igLastNull;
bool groupOrderScan;
} SScanLogicNode; } SScanLogicNode;
typedef struct SJoinLogicNode { typedef struct SJoinLogicNode {
@ -316,6 +318,7 @@ typedef struct SPhysiNode {
struct SPhysiNode* pParent; struct SPhysiNode* pParent;
SNode* pLimit; SNode* pLimit;
SNode* pSlimit; SNode* pSlimit;
bool forceCreateNonBlockingOptr;
} SPhysiNode; } SPhysiNode;
typedef struct SScanPhysiNode { typedef struct SScanPhysiNode {
@ -326,6 +329,7 @@ typedef struct SScanPhysiNode {
uint64_t suid; uint64_t suid;
int8_t tableType; int8_t tableType;
SName tableName; SName tableName;
bool groupOrderScan;
} SScanPhysiNode; } SScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode; typedef SScanPhysiNode STagScanPhysiNode;

View File

@ -632,7 +632,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
pStart += colSize; pStart += colSize;
} }
} else { } else {
if (dataSize != 0) {
// ubsan reports error if pCol->pData==NULL && dataSize==0
memcpy(pStart, pCol->pData, dataSize); memcpy(pStart, pCol->pData, dataSize);
}
pStart += dataSize; pStart += dataSize;
} }
} }
@ -684,8 +687,10 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} }
if (colLength != 0) {
// ubsan reports error if colLength==0 && pCol->pData == 0
memcpy(pCol->pData, pStart, colLength); memcpy(pCol->pData, pStart, colLength);
}
pStart += colLength; pStart += colLength;
} }

View File

@ -39,6 +39,7 @@ int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, vo
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst); void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst); void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst);
void mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst);
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize); void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
const char *mndGetStbStr(const char *src); const char *mndGetStbStr(const char *src);

View File

@ -2500,12 +2500,14 @@ static int32_t mndProcessTableCfgReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
if (0 == strcmp(cfgReq.dbFName, TSDB_INFORMATION_SCHEMA_DB)) { char dbName[TSDB_DB_NAME_LEN] = {0};
mndExtractShortDbNameFromDbFullName(cfgReq.dbFName, dbName);
if (0 == strcmp(dbName, TSDB_INFORMATION_SCHEMA_DB)) {
mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName); mInfo("information_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
if (mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) { if (mndBuildInsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) {
goto _OVER; goto _OVER;
} }
} else if (0 == strcmp(cfgReq.dbFName, TSDB_PERFORMANCE_SCHEMA_DB)) { } else if (0 == strcmp(dbName, TSDB_PERFORMANCE_SCHEMA_DB)) {
mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName); mInfo("performance_schema table:%s.%s, start to retrieve cfg", cfgReq.dbFName, cfgReq.tbName);
if (mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) { if (mndBuildPerfsTableCfg(pMnode, cfgReq.dbFName, cfgReq.tbName, &cfgRsp) != 0) {
goto _OVER; goto _OVER;
@ -2674,6 +2676,13 @@ void mndExtractShortDbNameFromStbFullName(const char *stbFullName, char *dst) {
tNameGetDbName(&name, dst); tNameGetDbName(&name, dst);
} }
void mndExtractShortDbNameFromDbFullName(const char *stbFullName, char *dst) {
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, dst);
}
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) { void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize) {
int32_t pos = -1; int32_t pos = -1;
int32_t num = 0; int32_t num = 0;

View File

@ -2102,7 +2102,7 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
} }
} }
if (TSDB_SUPER_TABLE == pCtx->tbType) { if (TSDB_SUPER_TABLE == pCtx->tbType || TSDB_SYSTEM_TABLE == pCtx->tbType) {
CTG_ERR_JRET(ctgGetTableCfgFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask)); CTG_ERR_JRET(ctgGetTableCfgFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
} else { } else {
if (NULL == pCtx->pVgInfo) { if (NULL == pCtx->pVgInfo) {

View File

@ -232,19 +232,20 @@ typedef struct STableMergeScanInfo {
int32_t tableEndIndex; int32_t tableEndIndex;
bool hasGroupId; bool hasGroupId;
uint64_t groupId; uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STableScanBase base; STableScanBase base;
int32_t bufPageSize; int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle* pSortHandle; SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock; SSDataBlock* pSortInputBlock;
SSDataBlock* pReaderBlock;
int64_t startTs; // sort start time int64_t startTs; // sort start time
SArray* sortSourceParams; SArray* sortSourceParams;
SLimitInfo limitInfo; SLimitInfo limitInfo;
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
int32_t readIdx;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo; SSortExecInfo sortExecInfo;
@ -362,7 +363,6 @@ typedef struct SStreamScanInfo {
SNode* pTagIndexCond; SNode* pTagIndexCond;
// recover // recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt; int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes; SSDataBlock* pRecoverRes;

View File

@ -26,6 +26,7 @@ extern "C" {
enum { enum {
SORT_MULTISOURCE_MERGE = 0x1, SORT_MULTISOURCE_MERGE = 0x1,
SORT_SINGLESOURCE_SORT = 0x2, SORT_SINGLESOURCE_SORT = 0x2,
SORT_BLOCK_TS_MERGE = 0x3
}; };
typedef struct SMultiMergeSource { typedef struct SMultiMergeSource {
@ -53,6 +54,12 @@ typedef struct SMsortComparParam {
int32_t numOfSources; int32_t numOfSources;
SArray* orderInfo; // SArray<SBlockOrderInfo> SArray* orderInfo; // SArray<SBlockOrderInfo>
bool cmpGroupId; bool cmpGroupId;
int32_t sortType;
// the following field to speed up when sortType == SORT_BLOCK_TS_MERGE
int32_t tsSlotId;
int32_t order;
__compar_fn_t cmpFn;
} SMsortComparParam; } SMsortComparParam;
typedef struct SSortHandle SSortHandle; typedef struct SSortHandle SSortHandle;
@ -70,8 +77,8 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
* @return * @return
*/ */
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
uint32_t sortBufSize); uint32_t pqSortBufSize);
void tsortSetForceUsePQSort(SSortHandle* pHandle); void tsortSetForceUsePQSort(SSortHandle* pHandle);
@ -110,6 +117,10 @@ int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetc
*/ */
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp); int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
/**
*
*/
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit);
/** /**
* *
*/ */

View File

@ -45,6 +45,8 @@ typedef struct SAggOperatorInfo {
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SExprSupp scalarExprSup; SExprSupp scalarExprSup;
bool groupKeyOptimized; bool groupKeyOptimized;
bool hasValidBlock;
SSDataBlock* pNewGroupBlock;
} SAggOperatorInfo; } SAggOperatorInfo;
static void destroyAggOperatorInfo(void* param); static void destroyAggOperatorInfo(void* param);
@ -53,7 +55,6 @@ static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, u
static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock);
static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock);
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator);
static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx);
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator); static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator);
@ -111,9 +112,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder;
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG,
pTaskInfo); !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResult, NULL, destroyAggOperatorInfo,
optrDefaultBufFn, NULL); optrDefaultBufFn, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
@ -153,28 +154,42 @@ void destroyAggOperatorInfo(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
// this is a blocking operator /**
int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { * @brief get blocks from downstream and fill results into groupedRes after aggragation
if (OPTR_IS_OPENED(pOperator)) { * @retval false if no more groups
return TSDB_CODE_SUCCESS; * @retval true if there could have new groups coming
} * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled
* if false, fill results of ONE GROUP
* */
static bool nextGroupedResult(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info; SAggOperatorInfo* pAggInfo = pOperator->info;
if (pOperator->blocking && pAggInfo->hasValidBlock) return false;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t order = pAggInfo->binfo.inputTsOrder; int32_t order = pAggInfo->binfo.inputTsOrder;
bool hasValidBlock = false; SSDataBlock* pBlock = pAggInfo->pNewGroupBlock;
if (pBlock) {
pAggInfo->pNewGroupBlock = NULL;
tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable);
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
while (1) { while (1) {
bool blockAllocated = false; bool blockAllocated = false;
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
if (!hasValidBlock) { if (!pAggInfo->hasValidBlock) {
createDataBlockForEmptyInput(pOperator, &pBlock); createDataBlockForEmptyInput(pOperator, &pBlock);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
@ -184,7 +199,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
break; break;
} }
} }
hasValidBlock = true; pAggInfo->hasValidBlock = true;
pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag; pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag;
// there is an scalar expression that needs to be calculated before apply the group aggregation. // there is an scalar expression that needs to be calculated before apply the group aggregation.
@ -196,7 +211,11 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
} }
// if non-blocking mode and new group arrived, save the block and break
if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) {
pAggInfo->pNewGroupBlock = pBlock;
break;
}
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId);
setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true);
@ -215,10 +234,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
OPTR_SET_OPENED(pOperator); return pBlock != NULL;
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return pTaskInfo->code;
} }
SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
@ -230,19 +246,17 @@ SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pTaskInfo->code = pOperator->fpSet._openFn(pOperator); bool hasNewGroups = false;
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { do {
setOperatorCompleted(pOperator); hasNewGroups = nextGroupedResult(pOperator);
return NULL;
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
while (1) { while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) { if (!hasRemainResults(&pAggInfo->groupResInfo)) {
setOperatorCompleted(pOperator); if (!hasNewGroups) setOperatorCompleted(pOperator);
break; break;
} }
@ -250,6 +264,7 @@ SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
break; break;
} }
} }
} while (pInfo->pRes->info.rows == 0 && hasNewGroups);
size_t rows = blockDataGetNumOfRows(pInfo->pRes); size_t rows = blockDataGetNumOfRows(pInfo->pRes);
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;

View File

@ -127,6 +127,10 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
} }
if (pGroupResInfo->pBuf) {
taosMemoryFree(pGroupResInfo->pBuf);
pGroupResInfo->pBuf = NULL;
}
// extract the result rows information from the hash map // extract the result rows information from the hash map
int32_t size = tSimpleHashGetSize(pHashmap); int32_t size = tSimpleHashGetSize(pHashmap);
@ -2104,6 +2108,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
if (groupSort && groupByTbname) { if (groupSort && groupByTbname) {
taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn);
pTableListInfo->numOfOuputGroups = numOfTables; pTableListInfo->numOfOuputGroups = numOfTables;
} else if (groupByTbname && pScanNode->groupOrderScan){
pTableListInfo->numOfOuputGroups = numOfTables;
} else { } else {
pTableListInfo->numOfOuputGroups = 1; pTableListInfo->numOfOuputGroups = 1;
} }

View File

@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// clang-format off
#include "executorInt.h" #include "executorInt.h"
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
@ -56,8 +54,7 @@ typedef struct STableMergeScanSortSourceParam {
SOperatorInfo* pOperator; SOperatorInfo* pOperator;
int32_t readerIdx; int32_t readerIdx;
uint64_t uid; uint64_t uid;
SSDataBlock* inputBlock; STsdbReader* reader;
STsdbReader* dataReader;
} STableMergeScanSortSourceParam; } STableMergeScanSortSourceParam;
typedef struct STableCountScanOperatorInfo { typedef struct STableCountScanOperatorInfo {
@ -1841,9 +1838,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 || if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) { pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) {
if (pInfo->blockRecoverContiCnt > 100) { if (isTaskKilled(pTaskInfo)) {
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
pInfo->blockRecoverContiCnt = 0;
return NULL; return NULL;
} }
@ -1888,7 +1883,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp); pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
if (pInfo->pRecoverRes != NULL) { if (pInfo->pRecoverRes != NULL) {
pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pInfo->pRecoverRes); calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
@ -2781,32 +2775,17 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t readIdx = source->readerIdx; SSDataBlock* pBlock = pInfo->pReaderBlock;
SSDataBlock* pBlock = source->inputBlock;
int32_t code = 0; int32_t code = 0;
SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader) {
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
pInfo->base.dataReader = source->dataReader;
STsdbReader* reader = pInfo->base.dataReader;
bool hasNext = false; bool hasNext = false;
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
STsdbReader* reader = pInfo->base.dataReader;
while (true) { while (true) {
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext); code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) { if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
@ -2816,7 +2795,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader); pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
pInfo->base.dataReader = NULL;
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
@ -2826,12 +2804,6 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
continue; continue;
} }
if (pQueryCond->order == TSDB_ORDER_ASC) {
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
} else {
pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
}
uint32_t status = 0; uint32_t status = 0;
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status); code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status); // code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
@ -2853,16 +2825,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader);
pInfo->base.dataReader = NULL;
return pBlock; return pBlock;
} }
pAPI->tsdReader.tsdReaderClose(source->dataReader);
source->dataReader = NULL;
pInfo->base.dataReader = NULL;
blockDataDestroy(source->inputBlock);
source->inputBlock = NULL;
return NULL; return NULL;
} }
@ -2898,6 +2863,8 @@ int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond*
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info; STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SReadHandle* pHandle = &pInfo->base.readHandle;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
{ {
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
@ -2914,53 +2881,29 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableStartIdx = pInfo->tableStartIndex;
int32_t tableEndIdx = pInfo->tableEndIndex; int32_t tableEndIdx = pInfo->tableEndIndex;
pInfo->base.dataReader = NULL; pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
// pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
int32_t kWay = (TSDB_MAX_BYTES_PER_ROW * 2) / (pInfo->pResBlock->info.rowSize);
if (kWay >= 128) {
kWay = 128;
} else if (kWay <= 2) {
kWay = 2;
} else {
int i = 2;
while (i * 2 <= kWay) i = i * 2;
kWay = i;
}
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
int64_t mergeLimit = -1;
if (pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1) {
mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
}
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
// one table has one data block // one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1; int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond));
for (int32_t i = 0; i < numOfTable; ++i) {
STableMergeScanSortSourceParam param = {0}; STableMergeScanSortSourceParam param = {0};
param.readerIdx = i;
param.pOperator = pOperator; param.pOperator = pOperator;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, NULL);
taosArrayPush(pInfo->sortSourceParams, &param);
SQueryTableDataCond cond;
dumpQueryTableCond(&pInfo->base.cond, &cond);
taosArrayPush(pInfo->queryConds, &cond);
}
for (int32_t i = 0; i < numOfTable; ++i) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); ps->param = &param;
ps->param = param;
ps->onlyRef = true; ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps); tsortAddSource(pInfo->pSortHandle, ps);
}
int32_t code = tsortOpen(pInfo->pSortHandle); int32_t code = tsortOpen(pInfo->pSortHandle);
@ -2976,8 +2919,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTable = taosArrayGetSize(pInfo->queryConds);
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer; pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
@ -2985,24 +2926,14 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
for (int32_t i = 0; i < numOfTable; ++i) { if (pInfo->base.dataReader != NULL) {
STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
blockDataDestroy(param->inputBlock); pInfo->base.dataReader = NULL;
pAPI->tsdReader.tsdReaderClose(param->dataReader);
param->dataReader = NULL;
} }
taosArrayClear(pInfo->sortSourceParams);
tsortDestroySortHandle(pInfo->pSortHandle); tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL; pInfo->pSortHandle = NULL;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) {
SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i);
taosMemoryFree(cond->colList);
}
taosArrayDestroy(pInfo->queryConds);
pInfo->queryConds = NULL;
resetLimitInfoForNextGroup(&pInfo->limitInfo); resetLimitInfoForNextGroup(&pInfo->limitInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3015,9 +2946,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pResBlock); blockDataCleanup(pResBlock);
STupleHandle* pTupleHandle = NULL;
while (1) { while (1) {
STupleHandle* pTupleHandle = tsortNextTuple(pHandle); while (1) {
pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) { if (pTupleHandle == NULL) {
break; break;
} }
@ -3036,7 +2968,10 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows); pInfo->limitInfo.numOfOutputRows);
if (pTupleHandle == NULL || limitReached || pResBlock->info.rows > 0) {
break;
}
}
return (pResBlock->info.rows > 0) ? pResBlock : NULL; return (pResBlock->info.rows > 0) ? pResBlock : NULL;
} }
@ -3100,14 +3035,7 @@ void destroyTableMergeScanOperatorInfo(void* param) {
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
cleanupQueryTableDataCond(&pTableScanInfo->base.cond); cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
for (int32_t i = 0; i < numOfTable; i++) {
STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i);
blockDataDestroy(p->inputBlock);
pTableScanInfo->base.readerAPI.tsdReaderClose(p->dataReader);
p->dataReader = NULL;
}
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL; pTableScanInfo->base.dataReader = NULL;
@ -3116,16 +3044,11 @@ void destroyTableMergeScanOperatorInfo(void* param) {
tsortDestroySortHandle(pTableScanInfo->pSortHandle); tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL; pTableScanInfo->pSortHandle = NULL;
for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) {
SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);
taosMemoryFree(pCond->colList);
}
taosArrayDestroy(pTableScanInfo->queryConds);
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
taosArrayDestroy(pTableScanInfo->pSortInfo); taosArrayDestroy(pTableScanInfo->pSortInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
@ -3187,6 +3110,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->base.scanFlag = MAIN_SCAN; pInfo->base.scanFlag = MAIN_SCAN;
pInfo->base.readHandle = *readHandle; pInfo->base.readHandle = *readHandle;
pInfo->readIdx = -1;
pInfo->base.limitInfo.limit.limit = -1; pInfo->base.limitInfo.limit.limit = -1;
pInfo->base.limitInfo.slimit.limit = -1; pInfo->base.limitInfo.slimit.limit = -1;
pInfo->base.pTableListInfo = pTableListInfo; pInfo->base.pTableListInfo = pTableListInfo;
@ -3209,6 +3134,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock); uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
@ -3618,5 +3545,3 @@ static void destoryTableCountScanOperator(void* param) {
taosArrayDestroy(pTableCountScanInfo->stbUidList); taosArrayDestroy(pTableCountScanInfo->stbUidList);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
// clang-format on

View File

@ -3143,6 +3143,8 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pWinBlock); blockDataDestroy(pInfo->pWinBlock);
blockDataDestroy(pInfo->pUpdateRes);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted); tSimpleHashCleanup(pInfo->pStDeleted);
taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->historyWins);
@ -3240,14 +3242,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
pCtx[i].saveHandle.pBuf = pSup->pResultBuf; pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
} }
if (pHandle) {
pSup->winRange = pHandle->winRange;
// temporary
if (pSup->winRange.ekey <= 0) {
pSup->winRange.ekey = INT64_MAX;
}
}
pSup->pSessionAPI = pApi; pSup->pSessionAPI = pApi;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3295,11 +3289,12 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) { if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) {
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore); releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore);
pCurWin->pOutputBuf = taosMemoryMalloc(size); pCurWin->pOutputBuf = taosMemoryCalloc(1, size);
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pCurWin->isOutput = true; pCurWin->isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin);
} else { } else {
pCurWin->sessionWin.win.skey = startTs; pCurWin->sessionWin.win.skey = startTs;
pCurWin->sessionWin.win.ekey = endTs; pCurWin->sessionWin.win.ekey = endTs;
@ -3432,7 +3427,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
} }
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
SSHashObj* pStDeleted) { SSHashObj* pStDeleted, bool addGap) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
@ -3456,7 +3451,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
int64_t winDelta = 0; int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) { if (addGap) {
winDelta = pAggSup->gap; winDelta = pAggSup->gap;
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta);
@ -3475,6 +3470,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
&pAggSup->stateStore); &pAggSup->stateStore);
pWinInfo->pOutputBuf = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3488,8 +3484,13 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t rows = pSDataBlock->info.rows; int32_t rows = pSDataBlock->info.rows;
int32_t winRows = 0; int32_t winRows = 0;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pAggSup->winRange.ekey <= 0) {
pAggSup->winRange.ekey = INT64_MAX;
}
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
@ -3501,7 +3502,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
TSKEY* endTsCols = (int64_t*)pEndTsCol->pData; TSKEY* endTsCols = (int64_t*)pEndTsCol->pData;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < rows;) { for (int32_t i = 0; i < rows;) {
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) { if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
i++; i++;
@ -3526,7 +3526,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
} }
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted); compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap);
saveSessionOutputBuf(pAggSup, &winInfo); saveSessionOutputBuf(pAggSup, &winInfo);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
@ -3690,7 +3690,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput, initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset); pChild->exprSupp.rowEntryInfoOffset);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL); compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
saveResult(parentWin, pStUpdated); saveResult(parentWin, pStUpdated);
} else { } else {
break; break;
@ -4091,8 +4091,8 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
} }
void resetWinRange(STimeWindow* winRange) { void resetWinRange(STimeWindow* winRange) {
winRange->skey = INT16_MIN; winRange->skey = INT64_MIN;
winRange->skey = INT16_MAX; winRange->ekey = INT64_MAX;
} }
void streamSessionReloadState(SOperatorInfo* pOperator) { void streamSessionReloadState(SOperatorInfo* pOperator) {
@ -4108,10 +4108,16 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
int32_t num = size / sizeof(SSessionKey); int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey)); ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pStUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) { for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0}; SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted); compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
saveSessionOutputBuf(pAggSup, &winInfo);
saveResult(winInfo, pInfo->pStUpdated);
} }
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
@ -4431,6 +4437,7 @@ void destroyStreamStateOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
taosArrayDestroy(pInfo->historyWins); taosArrayDestroy(pInfo->historyWins);
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted); tSimpleHashCleanup(pInfo->pSeDeleted);
blockDataDestroy(pInfo->pCheckpointRes); blockDataDestroy(pInfo->pCheckpointRes);
@ -4487,6 +4494,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pCurWin->winInfo.isOutput = true; pCurWin->winInfo.isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
} else if (pKeyData) { } else if (pKeyData) {
if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) { if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
varDataCopy(pCurWin->pStateKey->pData, pKeyData); varDataCopy(pCurWin->pStateKey->pData, pKeyData);
@ -4559,8 +4567,13 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t winRows = 0; int32_t winRows = 0;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pAggSup->winRange.ekey <= 0) {
pAggSup->winRange.ekey = INT64_MAX;
}
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
@ -4569,7 +4582,6 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
return; return;
} }
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
int32_t rows = pSDataBlock->info.rows; int32_t rows = pSDataBlock->info.rows;
blockDataEnsureCapacity(pAggSup->pScanBlock, rows); blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
@ -4859,7 +4871,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
SResultRow* pWinResult = NULL; SResultRow* pWinResult = NULL;
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey)); tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
if (pNextWin->isOutput && pStDeleted) { if (pNextWin->isOutput && pStDeleted) {
@ -4868,7 +4880,6 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin); removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin); doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
taosMemoryFree(pNextWin->pOutputBuf); taosMemoryFree(pNextWin->pOutputBuf);
saveSessionOutputBuf(pAggSup, pCurWin);
} }
void streamStateReloadState(SOperatorInfo* pOperator) { void streamStateReloadState(SOperatorInfo* pOperator) {
@ -4884,14 +4895,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
int32_t num = size / sizeof(SSessionKey); int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey)); ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pSeUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) { for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0}; SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0}; SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0}; SStateWindowInfo dummy = {0};
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey, nextInfo.pStateKey)) { if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted); compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
saveResult(curInfo.winInfo, pInfo->pStUpdated); saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
} }
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
@ -5392,7 +5408,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup); doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);

View File

@ -24,6 +24,7 @@
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tsort.h" #include "tsort.h"
#include "tutil.h" #include "tutil.h"
#include "tsimplehash.h"
struct STupleHandle { struct STupleHandle {
SSDataBlock* pBlock; SSDataBlock* pBlock;
@ -42,13 +43,15 @@ struct SSortHandle {
int64_t startTs; int64_t startTs;
uint64_t totalElapsed; uint64_t totalElapsed;
uint64_t maxRows; uint64_t pqMaxRows;
uint32_t maxTupleLength; uint32_t pqMaxTupleLength;
uint32_t sortBufSize; uint32_t pqSortBufSize;
bool forceUsePQSort; bool forceUsePQSort;
BoundedQueue* pBoundedQueue; BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx; uint32_t tmpRowIdx;
int64_t mergeLimit;
int32_t sourceId; int32_t sourceId;
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
SMsortComparParam cmpParam; SMsortComparParam cmpParam;
@ -173,8 +176,8 @@ void destroyTuple(void* t) {
* @return * @return
*/ */
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, SSDataBlock* pBlock, const char* idstr, uint64_t pqMaxRows, uint32_t pqMaxTupleLength,
uint32_t sortBufSize) { uint32_t pqSortBufSize) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type; pSortHandle->type = type;
@ -183,10 +186,10 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pSortInfo = pSortInfo; pSortHandle->pSortInfo = pSortInfo;
pSortHandle->loops = 0; pSortHandle->loops = 0;
pSortHandle->maxTupleLength = maxTupleLength; pSortHandle->pqMaxTupleLength = pqMaxTupleLength;
if (maxRows != 0) { if (pqMaxRows != 0) {
pSortHandle->sortBufSize = sortBufSize; pSortHandle->pqSortBufSize = pqSortBufSize;
pSortHandle->maxRows = maxRows; pSortHandle->pqMaxRows = pqMaxRows;
} }
pSortHandle->forceUsePQSort = false; pSortHandle->forceUsePQSort = false;
@ -194,10 +197,18 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
pSortHandle->mergeLimit = -1;
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->cmpParam.cmpGroupId = false; pSortHandle->cmpParam.cmpGroupId = false;
pSortHandle->cmpParam.sortType = type;
if (type == SORT_BLOCK_TS_MERGE) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pSortInfo, 0);
pSortHandle->cmpParam.tsSlotId = pOrder->slotId;
pSortHandle->cmpParam.order = pOrder->order;
pSortHandle->cmpParam.cmpFn = (pOrder->order == TSDB_ORDER_ASC) ? compareInt64Val : compareInt64ValDesc;
}
tsortSetComparFp(pSortHandle, msortComparFn); tsortSetComparFp(pSortHandle, msortComparFn);
if (idstr != NULL) { if (idstr != NULL) {
@ -469,11 +480,14 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
pSource->pageIndex++; pSource->pageIndex++;
if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) { if (pSource->pageIndex >= taosArrayGetSize(pSource->pageIdList)) {
qDebug("adjust merge tree. %d source completed %d", *numOfCompleted, pSource->pageIndex);
(*numOfCompleted) += 1; (*numOfCompleted) += 1;
pSource->src.rowIndex = -1; pSource->src.rowIndex = -1;
pSource->pageIndex = -1; pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
} else { } else {
if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex);
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, *pPgId); void* pPage = getBufPage(pHandle->pBuf, *pPgId);
@ -486,7 +500,6 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
} }
} else { } else {
@ -497,6 +510,7 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
if (pSource->src.pBlock == NULL) { if (pSource->src.pBlock == NULL) {
(*numOfCompleted) += 1; (*numOfCompleted) += 1;
pSource->src.rowIndex = -1; pSource->src.rowIndex = -1;
qDebug("adjust merge tree. %d source completed", *numOfCompleted);
} }
} }
} }
@ -577,21 +591,30 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
} }
} }
if (pParam->sortType == SORT_BLOCK_TS_MERGE) {
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pParam->tsSlotId);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pParam->tsSlotId);
int64_t* left1 = (int64_t*)(pLeftColInfoData->pData) + pLeftSource->src.rowIndex;
int64_t* right1 = (int64_t*)(pRightColInfoData->pData) + pRightSource->src.rowIndex;
int ret = pParam->cmpFn(left1, right1);
return ret;
} else {
for (int32_t i = 0; i < pInfo->size; ++i) { for (int32_t i = 0; i < pInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool leftNull = false; bool leftNull = false;
if (pLeftColInfoData->hasNull) { if (pLeftColInfoData->hasNull) {
if (pLeftBlock->pBlockAgg == NULL) { if (pLeftBlock->pBlockAgg == NULL) {
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex); leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
} else { } else {
leftNull = leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]); pLeftBlock->pBlockAgg[i]);
} }
} }
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
bool rightNull = false; bool rightNull = false;
if (pRightColInfoData->hasNull) { if (pRightColInfoData->hasNull) {
if (pRightBlock->pBlockAgg == NULL) { if (pRightBlock->pBlockAgg == NULL) {
@ -626,6 +649,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
return ret; return ret;
} }
} }
}
return 0; return 0;
} }
@ -668,6 +692,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
// Only *numOfInputSources* can be loaded into buffer to perform the external sort. // Only *numOfInputSources* can be loaded into buffer to perform the external sort.
for (int32_t i = 0; i < sortGroup; ++i) { for (int32_t i = 0; i < sortGroup; ++i) {
qDebug("internal merge sort pass %d group %d. num input sources %d ", t, i, numOfInputSources);
pHandle->sourceId += 1; pHandle->sourceId += 1;
int32_t end = (i + 1) * numOfInputSources - 1; int32_t end = (i + 1) * numOfInputSources - 1;
@ -690,6 +715,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return code; return code;
} }
int nMergedRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) { while (1) {
if (tsortIsClosed(pHandle)) { if (tsortIsClosed(pHandle)) {
@ -720,8 +747,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
nMergedRows += pDataBlock->info.rows;
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
break;
}
} }
sortComparCleanup(&pHandle->cmpParam); sortComparCleanup(&pHandle->cmpParam);
@ -769,11 +800,240 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols) {
return pgSize; return pgSize;
} }
static int32_t createInitialSources(SSortHandle* pHandle) { static int32_t createPageBuf(SSortHandle* pHandle) {
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; if (pHandle->pBuf == NULL) {
int32_t code = 0; if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_DISKSPACE;
qError("create page buf failed since %s, tempDir:%s", terrstr(), tsTempDir);
return terrno;
}
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize,
"tableBlocksBuf", tsTempDir);
dBufSetPrintInfo(pHandle->pBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return 0;
}
typedef struct SBlkMergeSupport {
int64_t** aTs;
int32_t* aRowIdx;
int32_t order;
} SBlkMergeSupport;
static int32_t blockCompareTsFn(const void* pLeft, const void* pRight, void* param) {
int32_t left = *(int32_t*)pLeft;
int32_t right = *(int32_t*)pRight;
SBlkMergeSupport* pSup = (SBlkMergeSupport*)param;
if (pSup->aRowIdx[left] == -1) {
return 1;
} else if (pSup->aRowIdx[right] == -1) {
return -1;
}
int64_t leftTs = pSup->aTs[left][pSup->aRowIdx[left]];
int64_t rightTs = pSup->aTs[right][pSup->aRowIdx[right]];
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
if (pSup->order == TSDB_ORDER_DESC) {
ret = -1 * ret;
}
return ret;
}
static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, SArray* aPgId) {
int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
taosArrayPush(aPgId, &pageId);
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage, blk);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
return 0;
}
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
int sz = 0;
int numCols = taosArrayGetSize(blk->pDataBlock);
if (!blk->info.hasVarCol) {
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
sz += blockDataGetRowSize(blk);
} else {
for (int32_t i = 0; i < numCols; ++i) {
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
if (pColInfoData->varmeta.offset[row] != -1) {
char* p = colDataGetData(pColInfoData, row);
sz += varDataTLen(p);
}
sz += sizeof(pColInfoData->varmeta.offset[0]);
} else {
sz += pColInfoData->info.bytes;
if (((rowIdxInPage) & 0x07) == 0) {
sz += 1; // bitmap
}
}
}
}
return sz;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockOrderInfo* order, SArray* aExtSrc) {
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlkMergeSupport sup;
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.order = order->order;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, order->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
}
int32_t totalRows = 0;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
totalRows += blk->info.rows;
}
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
SMultiwayMergeTreeInfo* pTree = NULL;
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
int32_t nRows = 0;
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
int32_t bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
break;
}
}
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
blkPgSz += bufInc;
++nRows;
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
sup.aRowIdx[minIdx] = -1;
} else {
++sup.aRowIdx[minIdx];
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
nMergedRows += pHandle->pDataBlock->info.rows;
}
blockDataCleanup(pHandle->pDataBlock);
}
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
tMergeTreeDestroy(&pTree);
return 0;
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
size_t maxBufSize = pHandle->numOfPages * pHandle->pageSize;
createPageBuf(pHandle);
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
int32_t szSort = 0;
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk);
void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
if (ppBlk != NULL) {
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
blockDataMerge(tBlk, pBlk);
} else {
SSDataBlock* tBlk = createOneDataBlock(pBlk, true);
tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
taosArrayPush(aBlkSort, &tBlk);
}
}
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
tSimpleHashClear(mUidBlk);
int64_t p = taosGetTimestampUs();
sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
qDebug("source %zu created", taosArrayGetSize(aExtSrc));
}
if (pBlk == NULL) {
break;
};
}
tSimpleHashCleanup(mUidBlk);
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
taosArrayDestroy(aExtSrc);
pHandle->type = SORT_SINGLESOURCE_SORT;
return 0;
}
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource; SSortSource* source = *pSource;
*pSource = NULL; *pSource = NULL;
@ -833,7 +1093,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle); code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -858,7 +1118,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code; return code;
} }
if (pHandle->maxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->maxRows); if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
@ -875,8 +1135,18 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
code = doAddToBuf(pHandle->pDataBlock, pHandle); code = doAddToBuf(pHandle->pDataBlock, pHandle);
} }
} }
} return code;
}
static int32_t createInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
code = createBlocksQuickSortInitialSources(pHandle);
} else if (pHandle->type == SORT_BLOCK_TS_MERGE) {
code = createBlocksMergeSortInitialSources(pHandle);
}
qDebug("%zu sources created", taosArrayGetSize(pHandle->pOrderedSource));
return code; return code;
} }
@ -923,6 +1193,10 @@ void tsortSetClosed(SSortHandle* pHandle) {
atomic_store_8(&pHandle->closed, 2); atomic_store_8(&pHandle->closed, 2);
} }
void tsortSetMergeLimit(SSortHandle* pHandle, int64_t mergeLimit) {
pHandle->mergeLimit = mergeLimit;
}
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
void* param) { void* param) {
pHandle->fetchfp = fetchFp; pHandle->fetchfp = fetchFp;
@ -1002,8 +1276,8 @@ void tsortSetForceUsePQSort(SSortHandle* pHandle) {
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) { static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false; if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
if (tsortIsForceUsePQSort(pHandle)) return true; if (tsortIsForceUsePQSort(pHandle)) return true;
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*)); uint64_t maxRowsFitInMemory = pHandle->pqSortBufSize / (pHandle->pqMaxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->maxRows; return maxRowsFitInMemory > pHandle->pqMaxRows;
} }
static bool tsortPQCompFn(void* a, void* b, void* param) { static bool tsortPQCompFn(void* a, void* b, void* param) {
@ -1049,7 +1323,7 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param)
} }
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destroyTuple, pHandle); pHandle->pBoundedQueue = createBoundedQueue(pHandle->pqMaxRows, tsortPQCompFn, destroyTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, tupleComparFn); tsortSetComparFp(pHandle, tupleComparFn);

View File

@ -361,6 +361,7 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD(groupAction); COPY_SCALAR_FIELD(groupAction);
COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(inputTsOrder);
COPY_SCALAR_FIELD(outputTsOrder); COPY_SCALAR_FIELD(outputTsOrder);
COPY_SCALAR_FIELD(forceCreateNonBlockingOptr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -397,6 +398,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_LIST_FIELD(pTags);
CLONE_NODE_FIELD(pSubtable); CLONE_NODE_FIELD(pSubtable);
COPY_SCALAR_FIELD(igLastNull); COPY_SCALAR_FIELD(igLastNull);
COPY_SCALAR_FIELD(groupOrderScan);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -545,6 +547,7 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
CLONE_NODE_LIST_FIELD(pChildren); CLONE_NODE_LIST_FIELD(pChildren);
COPY_SCALAR_FIELD(inputTsOrder); COPY_SCALAR_FIELD(inputTsOrder);
COPY_SCALAR_FIELD(outputTsOrder); COPY_SCALAR_FIELD(outputTsOrder);
COPY_SCALAR_FIELD(forceCreateNonBlockingOptr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -556,6 +559,7 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
COPY_SCALAR_FIELD(suid); COPY_SCALAR_FIELD(suid);
COPY_SCALAR_FIELD(tableType); COPY_SCALAR_FIELD(tableType);
COPY_OBJECT_FIELD(tableName, sizeof(SName)); COPY_OBJECT_FIELD(tableName, sizeof(SName));
COPY_SCALAR_FIELD(groupOrderScan);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -1559,6 +1559,7 @@ static const char* jkScanPhysiPlanTableId = "TableId";
static const char* jkScanPhysiPlanSTableId = "STableId"; static const char* jkScanPhysiPlanSTableId = "STableId";
static const char* jkScanPhysiPlanTableType = "TableType"; static const char* jkScanPhysiPlanTableType = "TableType";
static const char* jkScanPhysiPlanTableName = "TableName"; static const char* jkScanPhysiPlanTableName = "TableName";
static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan";
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj; const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
@ -1582,6 +1583,9 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName); code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanPhysiPlanGroupOrderScan, pNode->groupOrderScan);
}
return code; return code;
} }
@ -1608,6 +1612,9 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName); code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanPhysiPlanGroupOrderScan, &pNode->groupOrderScan);
}
return code; return code;
} }

View File

@ -1853,7 +1853,8 @@ enum {
PHY_NODE_CODE_LIMIT, PHY_NODE_CODE_LIMIT,
PHY_NODE_CODE_SLIMIT, PHY_NODE_CODE_SLIMIT,
PHY_NODE_CODE_INPUT_TS_ORDER, PHY_NODE_CODE_INPUT_TS_ORDER,
PHY_NODE_CODE_OUTPUT_TS_ORDER PHY_NODE_CODE_OUTPUT_TS_ORDER,
PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR
}; };
static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -1878,6 +1879,9 @@ static int32_t physiNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder); code = tlvEncodeEnum(pEncoder, PHY_NODE_CODE_OUTPUT_TS_ORDER, pNode->outputTsOrder);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR, pNode->forceCreateNonBlockingOptr);
}
return code; return code;
} }
@ -1910,6 +1914,8 @@ static int32_t msgToPhysiNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_NODE_CODE_OUTPUT_TS_ORDER: case PHY_NODE_CODE_OUTPUT_TS_ORDER:
code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder)); code = tlvDecodeEnum(pTlv, &pNode->outputTsOrder, sizeof(pNode->outputTsOrder));
break; break;
case PHY_NODE_CODE_FORCE_NONBLOCKING_OPTR:
code = tlvDecodeBool(pTlv, &pNode->forceCreateNonBlockingOptr);
default: default:
break; break;
} }
@ -1925,7 +1931,8 @@ enum {
PHY_SCAN_CODE_BASE_UID, PHY_SCAN_CODE_BASE_UID,
PHY_SCAN_CODE_BASE_SUID, PHY_SCAN_CODE_BASE_SUID,
PHY_SCAN_CODE_BASE_TABLE_TYPE, PHY_SCAN_CODE_BASE_TABLE_TYPE,
PHY_SCAN_CODE_BASE_TABLE_NAME PHY_SCAN_CODE_BASE_TABLE_NAME,
PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN
}; };
static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -1950,6 +1957,9 @@ static int32_t physiScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName); code = tlvEncodeObj(pEncoder, PHY_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN, pNode->groupOrderScan);
}
return code; return code;
} }
@ -1982,6 +1992,9 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SCAN_CODE_BASE_TABLE_NAME: case PHY_SCAN_CODE_BASE_TABLE_NAME:
code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName); code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName);
break; break;
case PHY_SCAN_CODE_BASE_GROUP_ORDER_SCAN:
code = tlvDecodeBool(pTlv, &pNode->groupOrderScan);
break;
default: default:
break; break;
} }

View File

@ -43,6 +43,9 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan); int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList); int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
bool isPartTableAgg(SAggLogicNode* pAgg);
bool isPartTableWinodw(SWindowLogicNode* pWindow);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -363,6 +363,18 @@ static void scanPathOptSetScanOrder(EScanOrder scanOrder, SScanLogicNode* pScan)
} }
} }
static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) {
if (pScan->tableType != TSDB_SUPER_TABLE) return;
if (pScan->node.pParent && nodeType(pScan->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) {
SAggLogicNode* pAgg = (SAggLogicNode*)pScan->node.pParent;
bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit);
if (withSlimit && isPartTableAgg(pAgg)) {
pScan->groupOrderScan = pAgg->node.forceCreateNonBlockingOptr = true;
}
}
}
static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SOsdInfo info = {.scanOrder = SCAN_ORDER_ASC}; SOsdInfo info = {.scanOrder = SCAN_ORDER_ASC};
int32_t code = scanPathOptMatch(pCxt, pLogicSubplan->pNode, &info); int32_t code = scanPathOptMatch(pCxt, pLogicSubplan->pNode, &info);
@ -371,6 +383,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
if (!pCxt->pPlanCxt->streamQuery) { if (!pCxt->pPlanCxt->streamQuery) {
scanPathOptSetScanOrder(info.scanOrder, info.pScan); scanPathOptSetScanOrder(info.scanOrder, info.pScan);
} }
scanPathOptSetGroupOrderScan(info.pScan);
} }
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) { if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs); info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
@ -1675,6 +1688,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if (QUERY_NODE_LOGIC_PLAN_AGG == pNode->pParent->type) { if (QUERY_NODE_LOGIC_PLAN_AGG == pNode->pParent->type) {
SAggLogicNode* pParent = (SAggLogicNode*)(pNode->pParent); SAggLogicNode* pParent = (SAggLogicNode*)(pNode->pParent);
scanPathOptSetGroupOrderScan(pScan);
pParent->hasGroupKeyOptimized = true; pParent->hasGroupKeyOptimized = true;
} }

View File

@ -447,6 +447,7 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS
pScanPhysiNode->uid = pScanLogicNode->tableId; pScanPhysiNode->uid = pScanLogicNode->tableId;
pScanPhysiNode->suid = pScanLogicNode->stableId; pScanPhysiNode->suid = pScanLogicNode->stableId;
pScanPhysiNode->tableType = pScanLogicNode->tableType; pScanPhysiNode->tableType = pScanLogicNode->tableType;
pScanPhysiNode->groupOrderScan = pScanLogicNode->groupOrderScan;
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName)); memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
if (NULL != pScanLogicNode->pTagCond) { if (NULL != pScanLogicNode->pTagCond) {
pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond); pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
@ -880,6 +881,7 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true); pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized; pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
pAgg->node.forceCreateNonBlockingOptr = pAggLogicNode->node.forceCreateNonBlockingOptr;
SNodeList* pPrecalcExprs = NULL; SNodeList* pPrecalcExprs = NULL;
SNodeList* pGroupKeys = NULL; SNodeList* pGroupKeys = NULL;

View File

@ -306,54 +306,6 @@ static bool stbSplIsTableCountQuery(SLogicNode* pNode) {
return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType; return QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && SCAN_TYPE_TABLE_COUNT == ((SScanLogicNode*)pChild)->scanType;
} }
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pGroupTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else {
return NULL;
}
}
static bool stbSplHasPartTbname(SNodeList* pPartKeys) {
if (NULL == pPartKeys) {
return false;
}
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartKeys) {
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
}
if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) {
return true;
}
}
return false;
}
static bool stbSplNotSystemScan(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return SCAN_TYPE_SYSTEM_TABLE != ((SScanLogicNode*)pNode)->scanType;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
} else {
return true;
}
}
static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
return false;
}
if (NULL != pAgg->pGroupKeys) {
return stbSplHasPartTbname(pAgg->pGroupKeys) &&
stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
}
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) { switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: case QUERY_NODE_LOGIC_PLAN_SCAN:
@ -364,7 +316,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode); return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) || return (!stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) ||
stbSplIsPartTableAgg((SAggLogicNode*)pNode)) && isPartTableAgg((SAggLogicNode*)pNode)) &&
stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode); stbSplHasMultiTbScan(streamQuery, pNode) && !stbSplIsTableCountQuery(pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
return stbSplNeedSplitWindow(streamQuery, pNode); return stbSplNeedSplitWindow(streamQuery, pNode);
@ -778,10 +730,6 @@ static int32_t stbSplSplitEvent(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
} }
} }
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) { switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
case WINDOW_TYPE_INTERVAL: case WINDOW_TYPE_INTERVAL:
@ -834,7 +782,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
} }
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) { if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
return stbSplSplitWindowForPartTable(pCxt, pInfo); return stbSplSplitWindowForPartTable(pCxt, pInfo);
} else { } else {
return stbSplSplitWindowForCrossTable(pCxt, pInfo); return stbSplSplitWindowForCrossTable(pCxt, pInfo);
@ -920,7 +868,7 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit
} }
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
if (stbSplIsPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) { if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
return stbSplSplitAggNodeForPartTable(pCxt, pInfo); return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
} }
return stbSplSplitAggNodeForCrossTable(pCxt, pInfo); return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);

View File

@ -321,3 +321,57 @@ int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requir
} }
return code; return code;
} }
static bool stbNotSystemScan(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return SCAN_TYPE_SYSTEM_TABLE != ((SScanLogicNode*)pNode)->scanType;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return stbNotSystemScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
} else {
return true;
}
}
static bool stbHasPartTbname(SNodeList* pPartKeys) {
if (NULL == pPartKeys) {
return false;
}
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartKeys) {
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
}
if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType)) {
return true;
}
}
return false;
}
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pGroupTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
} else {
return NULL;
}
}
bool isPartTableAgg(SAggLogicNode* pAgg) {
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
return false;
}
if (NULL != pAgg->pGroupKeys) {
return stbHasPartTbname(pAgg->pGroupKeys) &&
stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
}
return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}
bool isPartTableWinodw(SWindowLogicNode* pWindow) {
return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}

View File

@ -2141,7 +2141,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
if (sessionRangeKeyCmpr(&searchKey, key) == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
taosMemoryFreeClear(*pVal); taosMemoryFreeClear(*pVal);
streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
taosMemoryFreeClear(*pVal); taosMemoryFreeClear(*pVal);
@ -2157,7 +2156,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
if (code == 0) { if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
} }
@ -2215,14 +2213,12 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
if (code == 0) { if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
@ -2238,7 +2234,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
} }

View File

@ -160,6 +160,7 @@
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py ,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5 ,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py

View File

@ -440,8 +440,10 @@ class TDSql:
time.sleep(1) time.sleep(1)
continue continue
def execute(self, sql,queryTimes=30): def execute(self, sql, queryTimes=30, show=False):
self.sql = sql self.sql = sql
if show:
tdLog.info(sql)
i=1 i=1
while i <= queryTimes: while i <= queryTimes:
try: try:

View File

@ -95,6 +95,23 @@ class TDTestCase:
tdSql.checkEqual(f'{db}',tdSql.queryResult[0][0]) tdSql.checkEqual(f'{db}',tdSql.queryResult[0][0])
tdSql.checkEqual(f'CREATE DATABASE `{db}`',tdSql.queryResult[0][1]) tdSql.checkEqual(f'CREATE DATABASE `{db}`',tdSql.queryResult[0][1])
def show_create_systb_sql(self):
for param in self.ins_param_list:
tdSql.query(f'show create table information_schema.ins_{param}')
tdSql.checkEqual(f'ins_{param}',tdSql.queryResult[0][0])
tdSql.execute(f'use information_schema')
tdSql.query(f'show create table ins_{param}')
tdSql.checkEqual(f'ins_{param}',tdSql.queryResult[0][0])
for param in self.perf_param_list:
tdSql.query(f'show create table performance_schema.perf_{param}')
tdSql.checkEqual(f'perf_{param}',tdSql.queryResult[0][0])
tdSql.execute(f'use performance_schema')
tdSql.query(f'show create table perf_{param}')
tdSql.checkEqual(f'perf_{param}',tdSql.queryResult[0][0])
def show_create_sql(self): def show_create_sql(self):
create_db_sql = self.set_create_database_sql(self.db_param) create_db_sql = self.set_create_database_sql(self.db_param)
print(create_db_sql) print(create_db_sql)
@ -200,6 +217,7 @@ class TDTestCase:
self.perf_check() self.perf_check()
self.show_create_sql() self.show_create_sql()
self.show_create_sysdb_sql() self.show_create_sysdb_sql()
self.show_create_systb_sql()
def stop(self): def stop(self):
tdSql.close() tdSql.close()

View File

@ -0,0 +1,309 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import random
import time
import copy
import string
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
# random string
def random_string(self, count):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(count))
# get col value and total max min ...
def getColsValue(self, i, j):
# c1 value
if random.randint(1, 10) == 5:
c1 = None
else:
c1 = 1
# c2 value
if j % 3200 == 0:
c2 = 8764231
elif random.randint(1, 10) == 5:
c2 = None
else:
c2 = random.randint(-87654297, 98765321)
value = f"({self.ts}, "
# c1
if c1 is None:
value += "null,"
else:
self.c1Cnt += 1
value += f"{c1},"
# c2
if c2 is None:
value += "null,"
else:
value += f"{c2},"
# total count
self.c2Cnt += 1
# max
if self.c2Max is None:
self.c2Max = c2
else:
if c2 > self.c2Max:
self.c2Max = c2
# min
if self.c2Min is None:
self.c2Min = c2
else:
if c2 < self.c2Min:
self.c2Min = c2
# sum
if self.c2Sum is None:
self.c2Sum = c2
else:
self.c2Sum += c2
# c3 same with ts
value += f"{self.ts})"
# move next 1s interval
self.ts += 1
return value
# insert data
def insertData(self):
tdLog.info("insert data ....")
sqls = ""
for i in range(self.childCnt):
# insert child table
values = ""
pre_insert = f"insert into @db_name.t{i} values "
for j in range(self.childRow):
if values == "":
values = self.getColsValue(i, j)
else:
values += "," + self.getColsValue(i, j)
# batch insert
if j % self.batchSize == 0 and values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# append last
if values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# insert finished
tdLog.info(f"insert data successfully.\n"
f" inserted child table = {self.childCnt}\n"
f" inserted child rows = {self.childRow}\n"
f" total inserted rows = {self.childCnt*self.childRow}\n")
return
def exeDouble(self, sql):
# dbname replace
sql1 = sql.replace("@db_name", self.db1)
if len(sql1) > 100:
tdLog.info(sql1[:100])
else:
tdLog.info(sql1)
tdSql.execute(sql1)
sql2 = sql.replace("@db_name", self.db2)
if len(sql2) > 100:
tdLog.info(sql2[:100])
else:
tdLog.info(sql2)
tdSql.execute(sql2)
# prepareEnv
def prepareEnv(self):
# init
self.ts = 1680000000000
self.childCnt = 2
self.childRow = 100000
self.batchSize = 5000
self.vgroups1 = 4
self.vgroups2 = 4
self.db1 = "db1" # no sma
self.db2 = "db2" # have sma
self.smaClause = "interval(10s)"
# total
self.c1Cnt = 0
self.c2Cnt = 0
self.c2Max = None
self.c2Min = None
self.c2Sum = None
# alter local optimization to treu
sql = "alter local 'querysmaoptimize 1'"
tdSql.execute(sql, 5, True)
# check forbid mulit-replic on create sma index
sql = f"create database db vgroups {self.vgroups1} replica 3"
tdSql.execute(sql, 5, True)
sql = f"create table db.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
tdSql.execute(sql, 5, True)
sql = f"create sma index sma_test on db.st function(max(c1),max(c2),min(c1),min(c2)) {self.smaClause};"
tdLog.info(sql)
tdSql.error(sql)
# create database db
sql = f"create database @db_name vgroups {self.vgroups1} replica 1"
self.exeDouble(sql)
# create super talbe st
sql = f"create table @db_name.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
self.exeDouble(sql)
# create child table
for i in range(self.childCnt):
sql = f"create table @db_name.t{i} using @db_name.st tags({i}) "
self.exeDouble(sql)
# create sma index on db2
sql = f"use {self.db2}"
tdSql.execute(sql)
sql = f"create sma index sma_index_maxmin on {self.db2}.st function(max(c1),max(c2),min(c1),min(c2)) {self.smaClause};"
tdLog.info(sql)
tdSql.execute(sql)
# insert data
self.insertData()
# check data correct
def checkExpect(self, sql, expectVal):
tdSql.query(sql)
rowCnt = tdSql.getRows()
for i in range(rowCnt):
val = tdSql.getData(i,0)
if val != expectVal:
tdLog.exit(f"Not expect . query={val} expect={expectVal} i={i} sql={sql}")
return False
tdLog.info(f"check expect ok. sql={sql} expect ={expectVal} rowCnt={rowCnt}")
return True
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
# check query result same
def queryDoubleImpl(self, sql):
# sql
sql1 = sql.replace('@db_name', self.db1)
tdLog.info(sql1)
start1 = time.time()
rows1 = tdSql.query(sql1)
spend1 = time.time() - start1
res1 = copy.copy(tdSql.queryResult)
sql2 = sql.replace('@db_name', self.db2)
tdLog.info(sql2)
start2 = time.time()
tdSql.query(sql2)
spend2 = time.time() - start2
res2 = tdSql.queryResult
rowlen1 = len(res1)
rowlen2 = len(res2)
if rowlen1 != rowlen2:
tdLog.info(f"check error. rowlen1={rowlen1} rowlen2={rowlen2} both not equal.")
return False
for i in range(rowlen1):
row1 = res1[i]
row2 = res2[i]
collen1 = len(row1)
collen2 = len(row2)
if collen1 != collen2:
tdLog.info(f"checkerror. collen1={collen1} collen2={collen2} both not equal.")
return False
for j in range(collen1):
if row1[j] != row2[j]:
tdLog.exit(f"col={j} col1={row1[j]} col2={row2[j]} both col not equal.")
return False
# warning performance
multiple = spend1/spend2
tdLog.info("spend1=%.6fs spend2=%.6fs multiple=%.1f"%(spend1, spend2, multiple))
if spend2 > spend1 and multiple < 4:
tdLog.info(f"performace not reached: multiple(spend1/spend)={multiple} require is >=4 ")
return False
return True
# check query result same
def queryDouble(self, sql, tryCount=60, gap=1):
for i in range(tryCount):
if self.queryDoubleImpl(sql):
return True
# error
tdLog.info(f"queryDouble return false, try loop={i}")
time.sleep(gap)
tdLog.exit(f"queryDouble try {tryCount} times, but all failed.")
return False
# check result
def checkResult(self):
# max
sql = f"select max(c1) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# min
sql = f"select max(c2) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# mix
sql = f"select max(c1),max(c2),min(c1),min(c2) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# run
def run(self):
# prepare env
self.prepareEnv()
# check two db query result same
tdLog.info(f"check have sma(db1) and no sma(db2) performace...")
self.checkResult()
# stop
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -202,7 +202,7 @@ class TDTestCase:
if retCode != "TAOS_OK": if retCode != "TAOS_OK":
tdLog.exit("taos -s fail") tdLog.exit("taos -s fail")
tdSql.query("select count(*) from stb group by tg1") tdSql.query("select count(*) from stb group by tg1 order by count(*) desc")
tdSql.checkData(0, 0, 2) tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 1) tdSql.checkData(1, 0, 1)

View File

@ -79,6 +79,11 @@ class TDTestCase:
tdSql.query("select count(*) from (select * from meters order by ts desc)") tdSql.query("select count(*) from (select * from meters order by ts desc)")
tdSql.checkData(0, 0, allCnt) tdSql.checkData(0, 0, allCnt)
rowCnt = tdSql.query("select tbname, count(*) from meters partition by tbname slimit 11")
if rowCnt != 10:
tdLog.exit("partition by tbname should return 10 rows of table data which is " + str(rowCnt))
return
def run(self): def run(self):
binPath = self.getPath() binPath = self.getPath()