Merge branch 'develop' of github.com:taosdata/TDengine into dev/chr

This commit is contained in:
tomchon 2021-07-12 16:18:26 +08:00
commit 0ef4ef0576
26 changed files with 492 additions and 345 deletions

2
.gitignore vendored
View File

@ -68,6 +68,8 @@ CMakeError.log
*.o
version.c
taos.rc
src/connector/jdbc/.classpath
src/connector/jdbc/.project
src/connector/jdbc/.settings/
tests/comparisonTest/cassandra/cassandratest/.classpath
tests/comparisonTest/cassandra/cassandratest/.project

View File

@ -185,9 +185,10 @@ cmake .. && cmake --build .
# 安装
如果你不想安装可以直接在shell中运行。生成完成后安装 TDengine
生成完成后,安装 TDengine下文给出的指令以 Linux 为例,如果是在 Windows 下,那么对应的指令会是 `nmake install`
```bash
make install
sudo make install
```
用户可以在[文件目录结构](https://www.taosdata.com/cn/documentation/administrator#directories)中了解更多在操作系统中生成的目录或文件。
@ -195,7 +196,7 @@ make install
安装成功后,在终端中启动 TDengine 服务:
```bash
taosd
sudo systemctl start taosd
```
用户可以使用 TDengine Shell 来连接 TDengine 服务,在终端中,输入:
@ -208,7 +209,7 @@ taos
## 快速运行
TDengine 生成后,在终端执行以下命令
如果不希望以服务方式运行 TDengine也可以在终端中直接运行它。也即在生成完成后执行以下命令在 Windows 下,生成的可执行文件会带有 .exe 后缀,例如会名为 taosd.exe
```bash
./build/bin/taosd -c test/cfg

View File

@ -175,7 +175,7 @@ cmake .. && cmake --build .
# Installing
After building successfully, TDengine can be installed by:
After building successfully, TDengine can be installed by: (On Windows platform, the following command should be `nmake install`)
```bash
sudo make install
```
@ -197,7 +197,7 @@ If TDengine shell connects the server successfully, welcome messages and version
## Quick Run
If you don't want to run TDengine as a service, you can run it in current shell. For example, to quickly start a TDengine server after building, run the command below in terminal:
If you don't want to run TDengine as a service, you can run it in current shell. For example, to quickly start a TDengine server after building, run the command below in terminal: (We take Linux as an example, command on Windows will be `taosd.exe`)
```bash
./build/bin/taosd -c test/cfg
```

View File

@ -123,8 +123,8 @@ taosd -C
- minRows文件块中记录的最小条数。单位为条默认值100。
- maxRows文件块中记录的最大条数。单位为条默认值4096。
- comp文件压缩标志位。0关闭1一阶段压缩2两阶段压缩。默认值2。可通过 alter database 修改)
- walWAL级别。1写wal但不执行fsync2写wal, 而且执行fsync。默认值1。在 taos.cfg 中参数名需要写作 walLevel(可通过 alter database 修改)
- fsync当wal设置为2时执行fsync的周期。设置为0表示每次写入立即执行fsync。单位为毫秒默认值3000。(可通过 alter database 修改)
- walWAL级别。1写wal但不执行fsync2写wal, 而且执行fsync。默认值1。在 taos.cfg 中参数名需要写作 walLevel
- fsync当wal设置为2时执行fsync的周期。设置为0表示每次写入立即执行fsync。单位为毫秒默认值3000。
- cache内存块的大小。单位为兆字节MB默认值16。
- blocks每个VNODETSDB中有多少cache大小的内存块。因此一个VNODE的用的内存大小粗略为cache * blocks。单位为块默认值4。可通过 alter database 修改)
- replica副本个数。取值范围1-3单位为个默认值1。可通过 alter database 修改)

View File

@ -129,16 +129,6 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传
CACHELAST 参数控制是否在内存中缓存子表的最近数据。缺省值为 0取值范围 [0, 1, 2, 3]。其中 0 表示不缓存1 表示缓存子表最近一行数据2 表示缓存子表每一列的最近的非 NULL 值3 表示同时打开缓存最近行和列功能。(从 2.0.11.0 版本开始支持参数值 [0, 1],从 2.1.2.0 版本开始支持参数值 [0, 1, 2, 3]。)
说明:缓存最近行,将显著改善 LAST_ROW 函数的性能表现;缓存每列的最近非 NULL 值将显著改善无特殊影响WHERE、ORDER BY、GROUP BY、INTERVAL下的 LAST 函数的性能表现。
```mysql
ALTER DATABASE db_name WAL 1;
```
WAL 参数控制 WAL 日志的落盘方式。缺省值为 1取值范围为 [1, 2]。1 表示写 WAL但不执行 fsync2 表示写 WAL而且执行 fsync。
```mysql
ALTER DATABASE db_name FSYNC 3000;
```
FSYNC 参数控制执行 fsync 操作的周期。缺省值为 3000单位是毫秒取值范围为 [0, 180000]。如果设置为 0表示每次写入立即执行 fsync。该设置项主要用于调节 WAL 参数设为 2 时的系统行为。
**Tips**: 以上所有参数修改后都可以用show databases来确认是否修改成功。另外从 2.1.3.0 版本开始,修改这些参数后无需重启服务器即可生效。
- **显示系统所有数据库**

View File

@ -69,6 +69,39 @@ mkdir -p ${install_dir}/inc && cp ${header_files} ${install_dir}/inc
mkdir -p ${install_dir}/cfg && cp ${cfg_dir}/taos.cfg ${install_dir}/cfg/taos.cfg
mkdir -p ${install_dir}/bin && cp ${bin_files} ${install_dir}/bin && chmod a+x ${install_dir}/bin/*
if [ -f ${build_dir}/bin/jemalloc-config ]; then
mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
cp ${build_dir}/bin/jemalloc-config ${install_dir}/jemalloc/bin
if [ -f ${build_dir}/bin/jemalloc.sh ]; then
cp ${build_dir}/bin/jemalloc.sh ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/bin/jeprof ]; then
cp ${build_dir}/bin/jeprof ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/include/jemalloc/jemalloc.h ]; then
cp ${build_dir}/include/jemalloc/jemalloc.h ${install_dir}/jemalloc/include/jemalloc
fi
if [ -f ${build_dir}/lib/libjemalloc.so.2 ]; then
cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib
ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so
fi
if [ -f ${build_dir}/lib/libjemalloc.a ]; then
cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then
cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then
cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig
fi
if [ -f ${build_dir}/share/doc/jemalloc/jemalloc.html ]; then
cp ${build_dir}/share/doc/jemalloc/jemalloc.html ${install_dir}/jemalloc/share/doc/jemalloc
fi
if [ -f ${build_dir}/share/man/man3/jemalloc.3 ]; then
cp ${build_dir}/share/man/man3/jemalloc.3 ${install_dir}/jemalloc/share/man/man3
fi
fi
cd ${install_dir}
if [ "$osType" != "Darwin" ]; then

View File

@ -91,6 +91,39 @@ else
fi
chmod a+x ${install_dir}/bin/* || :
if [ -f ${build_dir}/bin/jemalloc-config ]; then
mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
cp ${build_dir}/bin/jemalloc-config ${install_dir}/jemalloc/bin
if [ -f ${build_dir}/bin/jemalloc.sh ]; then
cp ${build_dir}/bin/jemalloc.sh ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/bin/jeprof ]; then
cp ${build_dir}/bin/jeprof ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/include/jemalloc/jemalloc.h ]; then
cp ${build_dir}/include/jemalloc/jemalloc.h ${install_dir}/jemalloc/include/jemalloc
fi
if [ -f ${build_dir}/lib/libjemalloc.so.2 ]; then
cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib
ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so
fi
if [ -f ${build_dir}/lib/libjemalloc.a ]; then
cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then
cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then
cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig
fi
if [ -f ${build_dir}/share/doc/jemalloc/jemalloc.html ]; then
cp ${build_dir}/share/doc/jemalloc/jemalloc.html ${install_dir}/jemalloc/share/doc/jemalloc
fi
if [ -f ${build_dir}/share/man/man3/jemalloc.3 ]; then
cp ${build_dir}/share/man/man3/jemalloc.3 ${install_dir}/jemalloc/share/man/man3
fi
fi
cd ${install_dir}
if [ "$osType" != "Darwin" ]; then

View File

@ -91,6 +91,39 @@ else
fi
chmod a+x ${install_dir}/bin/* || :
if [ -f ${build_dir}/bin/jemalloc-config ]; then
mkdir -p ${install_dir}/jemalloc/{bin,lib,lib/pkgconfig,include/jemalloc,share/doc/jemalloc,share/man/man3}
cp ${build_dir}/bin/jemalloc-config ${install_dir}/jemalloc/bin
if [ -f ${build_dir}/bin/jemalloc.sh ]; then
cp ${build_dir}/bin/jemalloc.sh ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/bin/jeprof ]; then
cp ${build_dir}/bin/jeprof ${install_dir}/jemalloc/bin
fi
if [ -f ${build_dir}/include/jemalloc/jemalloc.h ]; then
cp ${build_dir}/include/jemalloc/jemalloc.h ${install_dir}/jemalloc/include/jemalloc
fi
if [ -f ${build_dir}/lib/libjemalloc.so.2 ]; then
cp ${build_dir}/lib/libjemalloc.so.2 ${install_dir}/jemalloc/lib
ln -sf libjemalloc.so.2 ${install_dir}/jemalloc/lib/libjemalloc.so
fi
if [ -f ${build_dir}/lib/libjemalloc.a ]; then
cp ${build_dir}/lib/libjemalloc.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/libjemalloc_pic.a ]; then
cp ${build_dir}/lib/libjemalloc_pic.a ${install_dir}/jemalloc/lib
fi
if [ -f ${build_dir}/lib/pkgconfig/jemalloc.pc ]; then
cp ${build_dir}/lib/pkgconfig/jemalloc.pc ${install_dir}/jemalloc/lib/pkgconfig
fi
if [ -f ${build_dir}/share/doc/jemalloc/jemalloc.html ]; then
cp ${build_dir}/share/doc/jemalloc/jemalloc.html ${install_dir}/jemalloc/share/doc/jemalloc
fi
if [ -f ${build_dir}/share/man/man3/jemalloc.3 ]; then
cp ${build_dir}/share/man/man3/jemalloc.3 ${install_dir}/jemalloc/share/man/man3
fi
fi
cd ${install_dir}
if [ "$osType" != "Darwin" ]; then

View File

@ -7802,14 +7802,16 @@ static int32_t doLoadAllTableMeta(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNod
}
static STableMeta* extractTempTableMetaFromSubquery(SQueryInfo* pUpstream) {
int32_t numOfColumns = pUpstream->fieldsInfo.numOfOutput;
STableMetaInfo* pUpstreamTableMetaInfo = tscGetMetaInfo(pUpstream, 0);
STableMeta* meta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * numOfColumns);
int32_t numOfColumns = pUpstream->fieldsInfo.numOfOutput;
STableMeta *meta = calloc(1, sizeof(STableMeta) + sizeof(SSchema) * numOfColumns);
meta->tableType = TSDB_TEMP_TABLE;
STableComInfo *info = &meta->tableInfo;
info->numOfColumns = numOfColumns;
info->numOfTags = 0;
info->precision = pUpstreamTableMetaInfo->pTableMeta->tableInfo.precision;
info->numOfTags = 0;
int32_t n = 0;
for(int32_t i = 0; i < numOfColumns; ++i) {

View File

@ -57,8 +57,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
parameterCnt++;
}
}
parameters = new Object[parameterCnt];
}
parameters = new Object[parameterCnt];
if (parameterCnt > 1) {
// the table name is also a parameter, so ignore it.

View File

@ -22,16 +22,16 @@ public class RestfulPreparedStatement extends RestfulStatement implements Prepar
super(conn, database);
this.rawSql = sql;
int parameterCnt = 0;
if (sql.contains("?")) {
int parameterCnt = 0;
for (int i = 0; i < sql.length(); i++) {
if ('?' == sql.charAt(i)) {
parameterCnt++;
}
}
parameters = new Object[parameterCnt];
this.isPrepared = true;
}
parameters = new Object[parameterCnt];
// build parameterMetaData
this.parameterMetaData = new RestfulParameterMetaData(parameters);

View File

@ -15,6 +15,8 @@ public class RestfulPreparedStatementTest {
private static PreparedStatement pstmt_insert;
private static final String sql_select = "select * from t1 where ts > ? and ts <= ? and f1 >= ?";
private static PreparedStatement pstmt_select;
private static final String sql_without_parameters = "select count(*) from t1";
private static PreparedStatement pstmt_without_parameters;
@Test
public void executeQuery() throws SQLException {
@ -237,6 +239,7 @@ public class RestfulPreparedStatementTest {
@Test
public void clearParameters() throws SQLException {
pstmt_insert.clearParameters();
pstmt_without_parameters.clearParameters();
}
@Test
@ -382,6 +385,7 @@ public class RestfulPreparedStatementTest {
pstmt_insert = conn.prepareStatement(sql_insert);
pstmt_select = conn.prepareStatement(sql_select);
pstmt_without_parameters = conn.prepareStatement(sql_without_parameters);
} catch (SQLException e) {
e.printStackTrace();
}
@ -394,6 +398,8 @@ public class RestfulPreparedStatementTest {
pstmt_insert.close();
if (pstmt_select != null)
pstmt_select.close();
if (pstmt_without_parameters != null)
pstmt_without_parameters.close();
if (conn != null)
conn.close();
} catch (SQLException e) {

View File

@ -15,36 +15,18 @@ const { NULL_POINTER } = require('ref-napi');
module.exports = CTaosInterface;
function convertMillisecondsToDatetime(time) {
return new TaosObjects.TaosTimestamp(time);
}
function convertMicrosecondsToDatetime(time) {
return new TaosObjects.TaosTimestamp(time * 0.001, true);
}
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
timestampConverter = convertMillisecondsToDatetime;
if (micro == true) {
timestampConverter = convertMicrosecondsToDatetime;
}
function convertTimestamp(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
let queue = [];
let time = 0;
for (let i = currOffset; i < currOffset + nbytes; i++) {
queue.push(data[i]);
}
for (let i = queue.length - 1; i >= 0; i--) {
time += queue[i] * Math.pow(16, i * 2);
}
let time = data.readInt64LE(currOffset);
currOffset += nbytes;
res.push(timestampConverter(time));
res.push(new TaosObjects.TaosTimestamp(time, precision));
}
return res;
}
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertBool(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = new Array(data.length);
for (let i = 0; i < data.length; i++) {
@ -60,7 +42,7 @@ function convertBool(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
}
return res;
}
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
@ -71,7 +53,7 @@ function convertTinyint(data, num_of_rows, nbytes = 0, offset = 0, micro = false
}
return res;
}
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
@ -82,7 +64,7 @@ function convertSmallint(data, num_of_rows, nbytes = 0, offset = 0, micro = fals
}
return res;
}
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertInt(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
@ -93,7 +75,7 @@ function convertInt(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
}
return res;
}
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
@ -104,7 +86,7 @@ function convertBigint(data, num_of_rows, nbytes = 0, offset = 0, micro = false)
}
return res;
}
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
@ -115,7 +97,7 @@ function convertFloat(data, num_of_rows, nbytes = 0, offset = 0, micro = false)
}
return res;
}
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
@ -127,7 +109,7 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro = false)
return res;
}
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, precision = 0) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
@ -272,7 +254,7 @@ CTaosInterface.prototype.config = function config() {
CTaosInterface.prototype.connect = function connect(host = null, user = "root", password = "taosdata", db = null, port = 0) {
let _host, _user, _password, _db, _port;
try {
_host = host != null ? ref.allocCString(host) : ref.alloc(ref.types.char_ptr, ref.NULL);
_host = host != null ? ref.allocCString(host) : ref.NULL;
}
catch (err) {
throw "Attribute Error: host is expected as a str";
@ -290,7 +272,7 @@ CTaosInterface.prototype.connect = function connect(host = null, user = "root",
throw "Attribute Error: password is expected as a str";
}
try {
_db = db != null ? ref.allocCString(db) : ref.alloc(ref.types.char_ptr, ref.NULL);
_db = db != null ? ref.allocCString(db) : ref.NULL;
}
catch (err) {
throw "Attribute Error: db is expected as a str";
@ -345,8 +327,7 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
}
var fieldL = this.libtaos.taos_fetch_lengths(result);
let isMicro = (this.libtaos.taos_result_precision(result) == FieldTypes.C_TIMESTAMP_MICRO);
let precision = this.libtaos.taos_result_precision(result);
var fieldlens = [];
@ -373,7 +354,7 @@ CTaosInterface.prototype.fetchBlock = function fetchBlock(result, fields) {
if (!convertFunctions[fields[i]['type']]) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](pdata, num_of_rows, fieldlens[i], offset, isMicro);
blocks[i] = convertFunctions[fields[i]['type']](pdata, num_of_rows, fieldlens[i], offset, precision);
}
}
return { blocks: blocks, num_of_rows }
@ -423,7 +404,7 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
let row = cti.libtaos.taos_fetch_row(result2);
let fields = cti.fetchFields_a(result2);
let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
let precision = cti.libtaos.taos_result_precision(result2);
let blocks = new Array(fields.length);
blocks.fill(null);
numOfRows2 = Math.abs(numOfRows2);
@ -449,7 +430,7 @@ CTaosInterface.prototype.fetch_rows_a = function fetch_rows_a(result, callback,
let prow = ref.reinterpret(row, 8, i * 8);
prow = prow.readPointer();
prow = ref.ref(prow);
blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, isMicro);
blocks[i] = convertFunctions[fields[i]['type']](prow, 1, fieldlens[i], offset, precision);
//offset += fields[i]['bytes'] * numOfRows2;
}
}
@ -572,7 +553,7 @@ CTaosInterface.prototype.openStream = function openStream(connection, sql, callb
var cti = this;
let asyncCallbackWrapper = function (param2, result2, row) {
let fields = cti.fetchFields_a(result2);
let isMicro = (cti.libtaos.taos_result_precision(result2) == FieldTypes.C_TIMESTAMP_MICRO);
let precision = cti.libtaos.taos_result_precision(result2);
let blocks = new Array(fields.length);
blocks.fill(null);
let numOfRows2 = 1;
@ -582,7 +563,7 @@ CTaosInterface.prototype.openStream = function openStream(connection, sql, callb
if (!convertFunctions[fields[i]['type']]) {
throw new errors.DatabaseError("Invalid data type returned from database");
}
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, isMicro);
blocks[i] = convertFunctions[fields[i]['type']](row, numOfRows2, fields[i]['bytes'], offset, precision);
offset += fields[i]['bytes'] * numOfRows2;
}
}

View File

@ -1,5 +1,5 @@
const FieldTypes = require('./constants');
const util = require('util');
/**
* Various objects such as TaosRow and TaosColumn that help make parsing data easier
* @module TaosObjects
@ -14,7 +14,7 @@ const FieldTypes = require('./constants');
* var trow = new TaosRow(row);
* console.log(trow.data);
*/
function TaosRow (row) {
function TaosRow(row) {
this.data = row;
this.length = row.length;
return this;
@ -29,10 +29,10 @@ function TaosRow (row) {
*/
function TaosField(field) {
this._field = field;
this.name = field.name;
this.type = FieldTypes.getType(field.type);
return this;
this._field = field;
this.name = field.name;
this.type = FieldTypes.getType(field.type);
return this;
}
/**
@ -42,39 +42,110 @@ function TaosField(field) {
* @param {Date} date - A Javascript date time object or the time in milliseconds past 1970-1-1 00:00:00.000
*/
class TaosTimestamp extends Date {
constructor(date, micro = false) {
super(date);
this._type = 'TaosTimestamp';
if (micro) {
this.microTime = date - Math.floor(date);
constructor(date, precision = 0) {
if (precision === 1) {
super(Math.floor(date / 1000));
this.precisionExtras = date % 1000;
} else if (precision === 2) {
super(parseInt(date / 1000000));
// use BigInt to fix: 1625801548423914405 % 1000000 = 914496 which not expected (914405)
this.precisionExtras = parseInt(BigInt(date) % 1000000n);
} else {
super(parseInt(date));
}
this.precision = precision;
}
/**
* TDengine raw timestamp.
* @returns raw taos timestamp (int64)
*/
taosTimestamp() {
if (this.precision == 1) {
return (this * 1000 + this.precisionExtras);
} else if (this.precision == 2) {
return (this * 1000000 + this.precisionExtras);
} else {
return Math.floor(this);
}
}
/**
* Gets the microseconds of a Date.
* @return {Int} A microseconds integer
*/
getMicroseconds() {
if (this.precision == 1) {
return this.getMilliseconds() * 1000 + this.precisionExtras;
} else if (this.precision == 2) {
return this.getMilliseconds() * 1000 + this.precisionExtras / 1000;
} else {
return 0;
}
}
/**
* Gets the nanoseconds of a TaosTimestamp.
* @return {Int} A nanoseconds integer
*/
getNanoseconds() {
if (this.precision == 1) {
return this.getMilliseconds() * 1000000 + this.precisionExtras * 1000;
} else if (this.precision == 2) {
return this.getMilliseconds() * 1000000 + this.precisionExtras;
} else {
return 0;
}
}
/**
* @returns {String} a string for timestamp string format
*/
_precisionExtra() {
if (this.precision == 1) {
return String(this.precisionExtras).padStart(3, '0');
} else if (this.precision == 2) {
return String(this.precisionExtras).padStart(6, '0');
} else {
return '';
}
}
/**
* @function Returns the date into a string usable by TDengine
* @return {string} A Taos Timestamp String
*/
toTaosString(){
toTaosString() {
var tzo = -this.getTimezoneOffset(),
dif = tzo >= 0 ? '+' : '-',
pad = function(num) {
var norm = Math.floor(Math.abs(num));
return (norm < 10 ? '0' : '') + norm;
},
pad2 = function(num) {
var norm = Math.floor(Math.abs(num));
if (norm < 10) return '00' + norm;
if (norm < 100) return '0' + norm;
if (norm < 1000) return norm;
};
dif = tzo >= 0 ? '+' : '-',
pad = function (num) {
var norm = Math.floor(Math.abs(num));
return (norm < 10 ? '0' : '') + norm;
},
pad2 = function (num) {
var norm = Math.floor(Math.abs(num));
if (norm < 10) return '00' + norm;
if (norm < 100) return '0' + norm;
if (norm < 1000) return norm;
};
return this.getFullYear() +
'-' + pad(this.getMonth() + 1) +
'-' + pad(this.getDate()) +
' ' + pad(this.getHours()) +
':' + pad(this.getMinutes()) +
':' + pad(this.getSeconds()) +
'.' + pad2(this.getMilliseconds()) +
'' + (this.microTime ? pad2(Math.round(this.microTime * 1000)) : '');
'-' + pad(this.getMonth() + 1) +
'-' + pad(this.getDate()) +
' ' + pad(this.getHours()) +
':' + pad(this.getMinutes()) +
':' + pad(this.getSeconds()) +
'.' + pad2(this.getMilliseconds()) +
'' + this._precisionExtra();
}
/**
* Custom console.log
* @returns {String} string format for debug
*/
[util.inspect.custom](depth, opts) {
return this.toTaosString() + JSON.stringify({ precision: this.precision, precisionExtras: this.precisionExtras }, opts);
}
toString() {
return this.toTaosString();
}
}
module.exports = {TaosRow, TaosField, TaosTimestamp}
module.exports = { TaosRow, TaosField, TaosTimestamp }

View File

@ -1,13 +1,13 @@
{
"name": "td2.0-connector",
"version": "2.0.8",
"version": "2.0.9",
"description": "A Node.js connector for TDengine.",
"main": "tdengine.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "node test/test.js"
"test": "node test/test.js && node test/testMicroseconds.js && node test/testNanoseconds.js"
},
"repository": {
"type": "git",

View File

@ -1,4 +1,4 @@
var TDengineConnection = require('./nodetaos/connection.js')
module.exports.connect = function (connection=null) {
module.exports.connect = function (connection={}) {
return new TDengineConnection(connection);
}

View File

@ -1,5 +1,5 @@
const taos = require('../tdengine');
var conn = taos.connect({host:"127.0.0.1", user:"root", password:"taosdata", config:"/etc/taos",port:10});
var conn = taos.connect();
var c1 = conn.cursor();
let stime = new Date();
let interval = 1000;

View File

@ -0,0 +1,49 @@
const taos = require('../tdengine');
var conn = taos.connect();
var c1 = conn.cursor();
let stime = new Date();
let interval = 1000;
function convertDateToTS(date) {
let tsArr = date.toISOString().split("T")
return "\"" + tsArr[0] + " " + tsArr[1].substring(0, tsArr[1].length - 1) + "\"";
}
function R(l, r) {
return Math.random() * (r - l) - r;
}
function randomBool() {
if (Math.random() < 0.5) {
return true;
}
return false;
}
// Initialize
//c1.execute('drop database td_connector_test;');
const dbname = 'nodejs_test_us';
c1.execute('create database if not exists ' + dbname + ' precision "us"');
c1.execute('use ' + dbname)
c1.execute('create table if not exists tstest (ts timestamp, _int int);');
c1.execute('insert into tstest values(1625801548423914, 0)');
// Select
console.log('select * from tstest');
c1.execute('select * from tstest');
var d = c1.fetchall();
console.log(c1.fields);
let ts = d[0][0];
console.log(ts);
if (ts.taosTimestamp() != 1625801548423914) {
throw "microseconds not match!";
}
if (ts.getMicroseconds() % 1000 !== 914) {
throw "micronsecond precision error";
}
setTimeout(function () {
c1.query('drop database nodejs_us_test;');
}, 200);
setTimeout(function () {
conn.close();
}, 2000);

View File

@ -0,0 +1,49 @@
const taos = require('../tdengine');
var conn = taos.connect();
var c1 = conn.cursor();
let stime = new Date();
let interval = 1000;
function convertDateToTS(date) {
let tsArr = date.toISOString().split("T")
return "\"" + tsArr[0] + " " + tsArr[1].substring(0, tsArr[1].length - 1) + "\"";
}
function R(l, r) {
return Math.random() * (r - l) - r;
}
function randomBool() {
if (Math.random() < 0.5) {
return true;
}
return false;
}
// Initialize
//c1.execute('drop database td_connector_test;');
const dbname = 'nodejs_test_ns';
c1.execute('create database if not exists ' + dbname + ' precision "ns"');
c1.execute('use ' + dbname)
c1.execute('create table if not exists tstest (ts timestamp, _int int);');
c1.execute('insert into tstest values(1625801548423914405, 0)');
// Select
console.log('select * from tstest');
c1.execute('select * from tstest');
var d = c1.fetchall();
console.log(c1.fields);
let ts = d[0][0];
console.log(ts);
if (ts.taosTimestamp() != 1625801548423914405) {
throw "nanosecond not match!";
}
if (ts.getNanoseconds() % 1000000 !== 914405) {
throw "nanosecond precision error";
}
setTimeout(function () {
c1.query('drop database nodejs_ns_test;');
}, 200);
setTimeout(function () {
conn.close();
}, 2000);

View File

@ -6008,8 +6008,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
errorPrint("[%d] %s() LN%d Failed to insert records of batch %d\n",
pThreadInfo->threadID, __func__, __LINE__,
batchPerTbl);
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
batchPerTbl, maxSqlLen / batchPerTbl);
if (batchPerTbl > 0) {
errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n",
batchPerTbl, maxSqlLen / batchPerTbl);
}
errorPrint("\tPlease check if the buffer length(%"PRId64") or batch(%d) is set with proper value!\n",
maxSqlLen, batchPerTbl);
goto free_of_interlace;

View File

@ -124,6 +124,9 @@ typedef struct {
extern char version[];
#define DB_PRECISION_LEN 8
#define DB_STATUS_LEN 16
typedef struct {
char name[TSDB_DB_NAME_LEN];
char create_time[32];
@ -144,9 +147,9 @@ typedef struct {
int32_t fsync;
int8_t comp;
int8_t cachelast;
char precision[8]; // time resolution
char precision[DB_PRECISION_LEN]; // time resolution
int8_t update;
char status[16];
char status[DB_STATUS_LEN];
} SDbInfo;
typedef struct {
@ -542,7 +545,8 @@ static void parse_precision_first(
free(tmp);
exit(-1);
}
strncpy(g_args.precision, tmp, strlen(tmp));
strncpy(g_args.precision, tmp,
min(DB_PRECISION_LEN - 1, strlen(tmp)));
free(tmp);
}
}
@ -1596,6 +1600,7 @@ static void taosStartDumpOutWorkThreads(int32_t numOfThread, char *dbName)
NULL, g_args.port);
if (pThread->taosCon == NULL) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
free(threadObj);
return;
}
pthread_attr_init(&thattr);
@ -2607,6 +2612,7 @@ static void taosStartDumpInWorkThreads()
NULL, g_args.port);
if (pThread->taosCon == NULL) {
errorPrint("Failed to connect to TDengine server %s\n", g_args.host);
free(threadObj);
return;
}
pthread_attr_init(&thattr);

View File

@ -105,8 +105,7 @@ typedef struct SResultRowInfo {
int16_t type:8; // data type for hash key
int32_t size:24; // number of result set
int32_t capacity; // max capacity
SResultRow* current; // current start active index
int64_t prevSKey; // previous (not completed) sliding window start key
SResultRow* current; // current active result row
} SResultRowInfo;
typedef struct SColumnFilterElem {
@ -277,6 +276,7 @@ typedef struct SQueryRuntimeEnv {
bool enableGroupData;
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
char* keyBuf; // window key buffer
SResultRowPool* pool; // window result object pool
char** prevRow;

View File

@ -242,6 +242,7 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeE
if (size <= 0) {
return;
}
int32_t orderId = pRuntimeEnv->pQueryAttr->order.orderColId;
if (orderId <= 0) {
return;
@ -410,25 +411,10 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
pResultRowInfo->capacity = (int32_t)newCapacity;
}
static int32_t ascResultRowCompareFn(const void* p1, const void* p2) {
SResultRow* pRow1 = *(SResultRow**)p1;
SResultRow* pRow2 = *(SResultRow**)p2;
if (pRow1 == pRow2) {
return 0;
} else {
return pRow1->win.skey < pRow2->win.skey? -1:1;
}
}
static int32_t descResultRowCompareFn(const void* p1, const void* p2) {
return -ascResultRowCompareFn(p1, p2);
}
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
int16_t bytes, bool masterscan, uint64_t uid) {
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, char *pData,
int16_t bytes, bool masterscan, uint64_t tableGroupId) {
bool existed = false;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
SResultRow **p1 =
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
@ -446,16 +432,15 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
existed = false;
} else if (pResultRowInfo->size == 1) {
existed = (pResultRowInfo->pResult[0] == (*p1));
} else {
__compar_fn_t fn = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr)? ascResultRowCompareFn:descResultRowCompareFn;
void* ptr = taosbsearch(p1, pResultRowInfo->pResult, pResultRowInfo->size, POINTER_BYTES, fn, TD_EQ);
if (ptr != NULL) {
existed = true;
}
} else { // check if current pResultRowInfo contains the existed pResultRow
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
void* ptr = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
existed = (ptr != NULL);
}
}
} else {
if (p1 != NULL) { // group by column query
// In case of group by column query, the required SResultRow object must be existed in the pResultRowInfo object.
if (p1 != NULL) {
return *p1;
}
}
@ -479,6 +464,10 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
pResultRowInfo->current = pResult;
int64_t dummyVal = 0;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &dummyVal, POINTER_BYTES);
}
// too many time window in query
@ -518,12 +507,12 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
STimeWindow w = {0};
if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
getInitialStartTimeWindow(pQueryAttr, ts, &w);
pResultRowInfo->prevSKey = w.skey;
} else {
w.skey = pResultRowInfo->prevSKey;
}
// pResultRowInfo->prevSKey = w.skey;
// } else {
// w.skey = pResultRowInfo->prevSKey;
// }
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
@ -531,10 +520,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
}
} else {
// int32_t slot = curTimeWindowIndex(pResultRowInfo);
// SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
SResultRow* pWindowRes = pResultRowInfo->current;
w = pWindowRes->win;
w = pResultRowInfo->current->win;
}
if (w.skey > ts || w.ekey < ts) {
@ -614,13 +600,13 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
return 0;
}
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx,
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey);
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId);
if (pResultRow == NULL) {
*pResult = NULL;
return TSDB_CODE_SUCCESS;
@ -628,7 +614,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
// not assign result buffer yet, add new result buffer
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQueryAttr->intermediateResultRowSize);
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) tableGroupId, pRuntimeEnv->pQueryAttr->intermediateResultRowSize);
if (ret != TSDB_CODE_SUCCESS) {
return -1;
}
@ -739,8 +725,6 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
} else {
pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object
}
pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
}
}
@ -1247,7 +1231,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
}
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
@ -1258,7 +1242,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
SResultRow* prevRow = pResultRowInfo->current;
// int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
@ -1275,7 +1258,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -1296,33 +1279,34 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
j++;
}
for(; pResultRowInfo->pResult[j] != pResultRowInfo->current; ++j) {
SResultRow* current = pResultRowInfo->current;
for(; pResultRowInfo->pResult[j] != current && j < pResultRowInfo->size; ++j) {
SResultRow* pRes = pResultRowInfo->pResult[j];
if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue;
}
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, groupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
STimeWindow w = pRes->win;
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1,
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
}
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0],
-1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
}
// restore current time window
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx,
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -1342,7 +1326,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
}
// null data, failed to allocate more memory buffer
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, groupId,
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -1483,7 +1467,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
@ -1504,7 +1488,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
@ -1547,7 +1531,8 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic
len = varDataLen(pData);
}
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, d, len, true, groupIndex);
int64_t tid = 0;
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex);
assert (pResultRow != NULL);
setResultRowKey(pResultRow, pData, type);
@ -1811,6 +1796,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->pResultRowListSet = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = malloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t));
pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
@ -2060,6 +2046,9 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL;
taosHashCleanup(pRuntimeEnv->pResultRowListSet);
pRuntimeEnv->pResultRowListSet = NULL;
destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
@ -2790,7 +2779,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -2836,7 +2825,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -3250,8 +3239,8 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
int32_t tid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&tid, sizeof(tid), true, uid);
int64_t tid = 0;
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid);
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
@ -3482,10 +3471,13 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
}
void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx,
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t groupIndex) {
int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId) {
// for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0;
int64_t tid = 0;
SResultRow* pResultRow =
doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid);
doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid);
assert (pResultRow != NULL);
/*
@ -3493,7 +3485,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
* all group belong to one result set, and each group result has different group id so set the id to be one
*/
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQueryAttr->resultRowSize);
int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, tableGroupId, pRuntimeEnv->pQueryAttr->resultRowSize);
if (ret != TSDB_CODE_SUCCESS) {
return;
}
@ -3502,20 +3494,20 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
}
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex,
void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId,
TSKEY nextKey) {
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == tableGroupId) {
return;
}
doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, groupIndex);
doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, tableGroupId);
// record the current active group id
pRuntimeEnv->prevGroupId = groupIndex;
pRuntimeEnv->prevGroupId = tableGroupId;
}
void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx,
@ -3686,12 +3678,16 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction
void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
SResultRowInfo *pWindowResInfo = &pTableQueryInfo->resInfo;
SResultRowInfo *pResultRowInfo = &pTableQueryInfo->resInfo;
if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) {
if (pResultRowInfo->current != NULL) {
return;
}
// if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) {
// return;
// }
pTableQueryInfo->win.skey = key;
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
@ -3707,13 +3703,13 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
TSKEY ek = MAX(win.skey, win.ekey);
getAlignQueryTimeWindow(pQueryAttr, win.skey, sk, ek, &w);
if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) {
if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
assert(win.ekey == pQueryAttr->window.ekey);
}
pWindowResInfo->prevSKey = w.skey;
}
// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) {
// assert(win.ekey == pQueryAttr->window.ekey);
// }
//
// pResultRowInfo->prevSKey = w.skey;
// }
pTableQueryInfo->lastKey = pTableQueryInfo->win.skey;
}
@ -3756,8 +3752,8 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
}
int32_t numOfRowsToCopy = pRow->numOfRows;
if (numOfResult + numOfRowsToCopy >= pRuntimeEnv->resultInfo.capacity) {
break;
if (numOfResult + numOfRowsToCopy >= pRuntimeEnv->resultInfo.capacity) {
break;
}
pGroupResInfo->index += 1;
@ -4614,7 +4610,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
if (pResultRowInfo->size > 0) {
pResultRowInfo->current = pResultRowInfo->pResult[0];
pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
// pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
}
qDebug("QInfo:0x%"PRIx64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
@ -4640,7 +4636,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
if (pResultRowInfo->size > 0) {
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
// pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
}
p = doTableScanImpl(pOperator, newgroup);
@ -5500,7 +5496,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
} else {
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
@ -5520,7 +5516,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SResultRow* pResult = NULL;
pInfo->curWindow.ekey = pInfo->curWindow.skey;
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
pBInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
@ -6149,7 +6145,7 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo));
pInfo->colIndex = -1; // group by column index
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;

View File

@ -44,7 +44,7 @@ int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) {
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) {
pResultRowInfo->type = type;
pResultRowInfo->size = 0;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
// pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
pResultRowInfo->current = NULL;
pResultRowInfo->capacity = size;
@ -93,7 +93,7 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
pResultRowInfo->size = 0;
pResultRowInfo->current = NULL;
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
// pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
}
int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {

View File

@ -450,4 +450,44 @@ if $data11 != 1 then
return -1
endi
print =====================>TD-5157
sql select twa(c1) from nest_tb1 interval(19a);
if $rows != 10000 then
return -1
endi
if $data00 != @20-09-14 23:59:59.992@ then
return -1
endi
if $data01 != 0.000083333 then
return -1
endi
print =================>us database interval query, TD-5039
sql create database test precision 'us';
sql use test;
sql create table t1(ts timestamp, k int);
sql insert into t1 values('2020-01-01 01:01:01.000', 1) ('2020-01-01 01:02:00.000', 2);
sql select avg(k) from (select avg(k) k from t1 interval(1s)) interval(1m);
if $rows != 2 then
return -1
endi
if $data00 != @20-01-01 01:01:00.000000@ then
return -1
endi
if $data01 != 1.000000000 then
return -1
endi
if $data10 != @20-01-01 01:02:00.000000@ then
return -1
endi
if $data11 != 2.000000000 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -1,147 +0,0 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
sleep 100
print ========== sub_in_from.sim
$i = 0
$dbPrefix = subdb
$tbPrefix = sub_tb
$stbPrefix = sub_stb
$tbNum = 10
$rowNum = 1000
$totalNum = $tbNum * $rowNum
$loops = 200000
$log = 10000
$ts0 = 1537146000000
$delta = 600000
$i = 0
$db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db cache 16 maxrows 4096 keep 36500
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
$i = 0
$ts = $ts0
$halfNum = $tbNum / 2
while $i < $halfNum
$tbId = $i + $halfNum
$tb = $tbPrefix . $i
$tb1 = $tbPrefix . $tbId
sql create table $tb using $stb tags( $i )
sql create table $tb1 using $stb tags( $tbId )
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
sql insert into $tb values ( $ts , $c , $c , $c , $c , $c , $c , true, $binary , $nchar )
sql insert into $tb1 values ( $ts , $c , NULL , $c , NULL , $c , $c , true, $binary , $nchar )
$x = $x + 1
endw
$i = $i + 1
endw
print ====== tables created
sql_error select count(*) from (select count(*) from abc.sub_stb0)
sql_error select val + 20 from (select count(*) from sub_stb0 interval(10h))
sql_error select abc+20 from (select count(*) from sub_stb0 interval(1s))
sql select count(*) from (select count(*) from sub_stb0 interval(10h))
if $rows != 1 then
return -1
endi
if $data00 != 18 then
print expect 18, actual: $data00
return -1
endi
sql select ts from (select count(*) from sub_stb0 interval(10h))
if $rows != 18 then
return -1
endi
if $data00 != @18-09-17 04:00:00.000@ then
return -1
endi
if $data01 != @18-09-17 14:00:00.000@ then
return -1
endi
sql select val + 20, val from (select count(*) as val from sub_stb0 interval(10h))
if $rows != 18 then
return -1
endi
if $data00 != 320.000000 then
return -1
endi
if $data01 != 300 then
return -1
endi
if $data10 != 620 then
return -1
endi
if $data11 != 600 then
return -1
endi
if $data20 != 620 then
return -1
endi
if $data21 != 600 then
return -1
endi
sql select max(val), min(val), max(val) - min(val) from (select count(*) val from sub_stb0 interval(10h))
if $rows != 1 then
return -1
endi
if $data00 != 600 then
return -1
endi
if $data01 != 100 then
return -1
endi
if $data02 != 500.000000 then
return -1
endi
sql select first(ts,val),last(ts,val) from (select count(*) val from sub_stb0 interval(10h))
sql select top(val, 5) from (select count(*) val from sub_stb0 interval(10h))
sql select diff(val) from (select count(*) val from sub_stb0 interval(10h))
sql select apercentile(val, 50) from (select count(*) val from sub_stb0 interval(10h))
# not support yet
sql select percentile(val, 50) from (select count(*) val from sub_stb0 interval(10h))
sql select stddev(val) from (select count(*) val from sub_stb0 interval(10h))
print ====================>complex query