enh: stream number limit

This commit is contained in:
Liu Jicong 2023-02-06 10:40:33 +08:00
parent aa12f5022c
commit e615053f4a
4 changed files with 142 additions and 110 deletions

View File

@ -355,6 +355,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
#define TSDB_CODE_MND_STREAM_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x03F4)
#define TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB TAOS_DEF_ERROR_CODE(0, 0x03F5)
#define TSDB_CODE_MND_TOO_MANY_STREAMS TAOS_DEF_ERROR_CODE(0, 0x03F6)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)

View File

@ -31,6 +31,8 @@
#define MND_STREAM_VER_NUMBER 2
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 10
static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
@ -625,6 +627,34 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
{
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than 10 for each database");
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
goto _OVER;
}
break;
}
if (pStream->sourceDbUid == streamObj.sourceDbUid) {
++numOfStream;
}
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than 10 for each database");
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
goto _OVER;
}
}
}
pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
if (pDb->cfg.replications != 1) {
mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);

View File

@ -287,6 +287,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_MUST_BE_DELETED, "Stream must be dropped first")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB, "Stream temporarily does not support source db having replica > 1")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams")
// mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")

View File

@ -58,59 +58,59 @@ sql create stream stb_atan_stream trigger at_once into output_atan_stb as select
sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1;
sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb;
sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb partition by tbname;
sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb partition by tbname;
sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb partition by tbname;
sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb partition by tbname;
sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb partition by tbname;
sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb partition by tbname;
sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb partition by tbname;
sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb partition by tbname;
sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb partition by tbname;
sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb partition by tbname;
sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb partition by tbname;
sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
# sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
# sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
# sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
# sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
# sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
# sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
# sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
# sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
# sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
# sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
# sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
# sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
# sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
# sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
# sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
# sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
# sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
# sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
# sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb partition by tbname;
# sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
# sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
# sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb partition by tbname;
# sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
# sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
# sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb partition by tbname;
# sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
# sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
# sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb partition by tbname;
# sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
# sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
# sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb partition by tbname;
# sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
# sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
# sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb partition by tbname;
# sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
# sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
# sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb partition by tbname;
# sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
# sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
# sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb partition by tbname;
# sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
# sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
# sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb partition by tbname;
# sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
# sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_ct1 values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);
@ -146,62 +146,62 @@ sql create stream stb_asin_stream trigger at_once into output_asin_stb as select
sql create stream ctb_asin_stream trigger at_once into output_asin_ctb as select ts, asin(c1), asin(c2), c3 from scalar_ct1;
sql create stream tb_asin_stream trigger at_once into output_asin_tb as select ts, asin(c1), asin(c2), c3 from scalar_tb;
sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1;
sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb;
sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb partition by tbname;
sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb partition by tbname;
sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb partition by tbname;
sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb partition by tbname;
sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb partition by tbname;
sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb partition by tbname;
sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb partition by tbname;
sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb partition by tbname;
sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb partition by tbname;
sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb partition by tbname;
sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb partition by tbname;
sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb partition by tbname;
sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
# sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1;
# sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb;
# sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1;
# sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb;
# sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1;
# sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb;
# sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1;
# sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb;
# sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1;
# sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb;
# sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1;
# sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb;
# sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1;
# sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb;
# sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1;
# sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb;
# sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1;
# sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb;
# sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb partition by tbname;
# sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1;
# sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb;
# sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb partition by tbname;
# sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1;
# sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb;
# sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb partition by tbname;
# sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1;
# sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb;
# sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb partition by tbname;
# sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1;
# sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb;
# sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb partition by tbname;
# sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1;
# sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb;
# sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb partition by tbname;
# sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1;
# sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb;
# sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb partition by tbname;
# sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1;
# sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb;
# sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb partition by tbname;
# sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1;
# sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb;
# sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb partition by tbname;
# sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1;
# sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb;
# sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb partition by tbname;
# sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1;
# sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb;
sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");
sql insert into scalar_ct1 values (1656668180503+1s, -50, -50.1, "tianjin", "taosdata", "Taosdata");
sql insert into scalar_ct1 values (1656668180503+2s, 0, Null, "hebei", "TDengine", Null);