diff --git a/cmake/cmake.install b/cmake/cmake.install index 27edd8e27a..d9f759217a 100644 --- a/cmake/cmake.install +++ b/cmake/cmake.install @@ -10,7 +10,9 @@ ELSEIF (TD_WINDOWS) # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector) # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector) # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .) - INSTALL(FILES ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg DESTINATION cfg) + INSTALL(CODE "IF (NOT EXISTS ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg) + execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg) + ENDIF ()") INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include) INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include) INSTALL(FILES ${TD_SOURCE_DIR}/include/libs/function/taosudf.h DESTINATION include) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index b4e8825431..de7b75a245 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -27,6 +27,10 @@ else () cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif() +if(TD_LINUX_64 AND JEMALLOC_ENABLED) + cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) +endif() + # pthread if(${BUILD_PTHREAD}) cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -392,6 +396,19 @@ if(${BUILD_WITH_SQLITE}) endif(NOT TD_WINDOWS) endif(${BUILD_WITH_SQLITE}) +# jemalloc +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + include(ExternalProject) + ExternalProject_Add(jemalloc + PREFIX "jemalloc" + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc + BUILD_IN_SOURCE 1 + CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/ + BUILD_COMMAND ${MAKE} + ) + INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include) +ENDIF () + # addr2line if(${BUILD_ADDR2LINE}) if(NOT ${TD_WINDOWS}) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 6dbf74f8bc..1cdfe20c8d 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -11,7 +11,7 @@ import TabItem from "@theme/TabItem"; ::: -TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。 +TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。也支持通过 `apt-get` 工具从线上进行安装。 ## 安装 @@ -293,4 +293,4 @@ taos> select avg(current), max(voltage), min(phase) from test.meters where group ```sql taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s); -``` \ No newline at end of file +``` diff --git a/docs/zh/07-develop/04-query-data/index.mdx b/docs/zh/07-develop/04-query-data/index.mdx index 824f36ef2f..eecda92744 100644 --- a/docs/zh/07-develop/04-query-data/index.mdx +++ b/docs/zh/07-develop/04-query-data/index.mdx @@ -25,6 +25,7 @@ TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或 - 单列、多列数据查询 - 标签和数值的多种过滤条件:>, <, =, <\>, like 等 - 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset) +- 时间窗口(Interval)、会话窗口(Session)和状态窗口(State_window)等窗口切分聚合查询 - 数值列及聚合结果的四则运算 - 时间戳对齐的连接查询(Join Query: 隐式连接)操作 - 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等 @@ -40,7 +41,7 @@ taos> select * from d1001 where voltage > 215 order by ts desc limit 2; Query OK, 2 row(s) in set (0.001100s) ``` -为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。TDengine 还支持连续查询。 +为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。 具体的查询语法请看 [TAOS SQL 的数据查询](/taos-sql/select) 章节。 @@ -73,7 +74,7 @@ taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now - Query OK, 1 row(s) in set (0.002136s) ``` -TDengine 仅容许对属于同一个超级表的表之间进行聚合查询,不同超级表之间的聚合查询不支持。在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。 +在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。 ## 降采样查询、插值 diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index 7faccdcec1..459b88085f 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -6,11 +6,11 @@ title: 数据订阅 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 -与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量。 +与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度。 -消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。 +消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once 消费。 -为了实现上述功能,TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。 +为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。 本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。 diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 7d78c2dc2f..be18ef1fc0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -104,6 +104,7 @@ typedef struct SDataBlockInfo { uint32_t capacity; // TODO: optimize and remove following int64_t version; // used for stream, and need serialization + int64_t ts; // used for stream, and need serialization int32_t childId; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b32129bfd7..d26e99f423 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3100,7 +3100,7 @@ typedef struct { void* msg; } SBatchRsp; -static FORCE_INLINE void tFreeSBatchRsp(void *p) { +static FORCE_INLINE void tFreeSBatchRsp(void* p) { if (NULL == p) { return; } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 103ca6a4f0..842e656b9d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -17,6 +17,7 @@ #include "os.h" #include "query.h" #include "tdatablock.h" +#include "tdbInt.h" #include "tmsg.h" #include "tmsgcb.h" #include "tqueue.h" @@ -85,6 +86,12 @@ enum { TASK_OUTPUT__FETCH, }; +enum { + STREAM_QUEUE__SUCESS = 1, + STREAM_QUEUE__FAILED, + STREAM_QUEUE__PROCESSING, +}; + typedef struct { int8_t type; } SStreamQueueItem; @@ -123,12 +130,6 @@ typedef struct { SSDataBlock* pBlock; } SStreamTrigger; -enum { - STREAM_QUEUE__SUCESS = 1, - STREAM_QUEUE__FAILED, - STREAM_QUEUE__PROCESSING, -}; - typedef struct { STaosQueue* queue; STaosQall* qall; @@ -233,6 +234,7 @@ typedef struct { typedef struct SStreamTask { int64_t streamId; int32_t taskId; + int32_t totalLevel; int8_t taskLevel; int8_t outputType; int16_t dispatchMsgType; @@ -458,13 +460,26 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); -typedef struct SStreamMeta SStreamMeta; +typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask); -SStreamMeta* streamMetaOpen(); +typedef struct SStreamMeta { + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pStateDb; + SHashObj* pTasks; + void* ahandle; + TXN txn; + FTaskExpand* expandFunc; +} SStreamMeta; + +SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc); void streamMetaClose(SStreamMeta* streamMeta); -int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); +SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/include/os/osDef.h b/include/os/osDef.h index 14f38eb7ff..be8689398d 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -57,7 +57,7 @@ extern "C" { #if defined(WINDOWS) char *stpcpy (char *dest, const char *src); - char *stpncpy (char *dest, const char *src, size_t n); + char *stpncpy (char *dest, const char *src, int n); // specific #ifndef __COMPAR_FN_T @@ -77,7 +77,7 @@ extern "C" { char * strsep(char **stringp, const char *delim); char * getpass(const char *prefix); - char * strndup(const char *s, size_t n); + char * strndup(const char *s, int n); // for send function in tsocket.c #define MSG_NOSIGNAL 0 diff --git a/packaging/release.bat b/packaging/release.bat index c1cf7875a5..d58e19cece 100644 --- a/packaging/release.bat +++ b/packaging/release.bat @@ -2,61 +2,78 @@ set internal_dir=%~dp0\..\..\ set community_dir=%~dp0\.. -cd %community_dir% -git checkout -- . -cd %community_dir%\packaging +set package_dir=%cd% :: %1 name %2 version if !%1==! GOTO USAGE if !%2==! GOTO USAGE -if %1 == taos GOTO TAOS -if %1 == power GOTO POWER -if %1 == tq GOTO TQ -if %1 == pro GOTO PRO -if %1 == kh GOTO KH -if %1 == jh GOTO JH -GOTO USAGE -:TAOS -goto RELEASE - -:POWER -call sed_power.bat %community_dir% -goto RELEASE - -:TQ -call sed_tq.bat %community_dir% -goto RELEASE - -:PRO -call sed_pro.bat %community_dir% -goto RELEASE - -:KH -call sed_kh.bat %community_dir% -goto RELEASE - -:JH -call sed_jh.bat %community_dir% -goto RELEASE - -:RELEASE -echo release windows-client-64 for %1, version: %2 -if not exist %internal_dir%\debug\ver-%2-64bit-%1 ( - md %internal_dir%\debug\ver-%2-64bit-%1 +if "%1" == "cluster" ( + set work_dir=%internal_dir% + set packagServerName_x64=TDengine-enterprise-server-%2-beta-Windows-x64 + set packagServerName_x86=TDengine-enterprise-server-%2-beta-Windows-x86 + set packagClientName_x64=TDengine-enterprise-client-%2-beta-Windows-x64 + set packagClientName_x86=TDengine-enterprise-client-%2-beta-Windows-x86 ) else ( - rd /S /Q %internal_dir%\debug\ver-%2-64bit-%1 - md %internal_dir%\debug\ver-%2-64bit-%1 + set work_dir=%community_dir% + set packagServerName_x64=TDengine-server-%2-Windows-x64 + set packagServerName_x86=TDengine-server-%2-Windows-x86 + set packagClientName_x64=TDengine-client-%2-Windows-x64 + set packagClientName_x86=TDengine-client-%2-Windows-x86 ) -cd %internal_dir%\debug\ver-%2-64bit-%1 -call "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall.bat" amd64 -cmake ../../ -G "NMake Makefiles" -DVERNUMBER=%2 -DCPUTYPE=x64 -set CL=/MP4 -nmake install + +echo release windows-client for %1, version: %2 +if not exist %work_dir%\debug ( + md %work_dir%\debug +) +if not exist %work_dir%\debug\ver-%2-x64 ( + md %work_dir%\debug\ver-%2-x64 +) else ( + rd /S /Q %work_dir%\debug\ver-%2-x64 + md %work_dir%\debug\ver-%2-x64 +) +if not exist %work_dir%\debug\ver-%2-x86 ( + md %work_dir%\debug\ver-%2-x86 +) else ( + rd /S /Q %work_dir%\debug\ver-%2-x86 + md %work_dir%\debug\ver-%2-x86 +) +cd %work_dir%\debug\ver-%2-x64 +call vcvarsall.bat x64 +cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DBUILD_HTTP=false -DVERNUMBER=%2 -DCPUTYPE=x64 +cmake --build . +rd /s /Q C:\TDengine +cmake --install . +if not %errorlevel% == 0 ( call :RUNFAILED build x64 failed & exit /b 1) +cd %package_dir% +iscc /DMyAppInstallName="%packagServerName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release +if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x64% failed & exit /b 1) +iscc /DMyAppInstallName="%packagClientName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release +if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x64% failed & exit /b 1) + +cd %work_dir%\debug\ver-%2-x86 +call vcvarsall.bat x86 +cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DBUILD_HTTP=false -DVERNUMBER=%2 -DCPUTYPE=x86 +cmake --build . +rd /s /Q C:\TDengine +cmake --install . +if not %errorlevel% == 0 ( call :RUNFAILED build x86 failed & exit /b 1) +cd %package_dir% +iscc /DMyAppInstallName="%packagServerName_x86%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release +if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x86% failed & exit /b 1) +iscc /DMyAppInstallName="%packagClientName_x86%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release +if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x86% failed & exit /b 1) + goto EXIT0 :USAGE -echo Usage: release.bat $productName $version +echo Usage: release.bat $verMode $version goto EXIT0 -:EXIT0 \ No newline at end of file +:EXIT0 +exit /b + +:RUNFAILED +echo %* +cd %package_dir% +goto :eof \ No newline at end of file diff --git a/packaging/release.sh b/packaging/release.sh index 3426c2856d..2452ee1813 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -26,7 +26,7 @@ soMode=dynamic # [static | dynamic] dbName=taos # [taos | ...] allocator=glibc # [glibc | jemalloc] verNumber="" -verNumberComp="2.0.0.0" +verNumberComp="3.0.0.0" httpdBuild=false while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do @@ -216,7 +216,7 @@ else fi # check support cpu type -if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]]; then +if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]]; then if [ "$verMode" != "cluster" ]; then # community-version compile cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro} diff --git a/packaging/tools/favicon.ico b/packaging/tools/favicon.ico new file mode 100644 index 0000000000..20b8026d1d Binary files /dev/null and b/packaging/tools/favicon.ico differ diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 2c3b70f5ed..beb0718987 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -30,7 +30,6 @@ configDir="/etc/taos" installDir="/usr/local/taos" adapterName="taosadapter" benchmarkName="taosBenchmark" -tmqName="tmq_sim" dumpName="taosdump" demoName="taosdemo" diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index a3f4c35842..6103ce170c 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -60,7 +60,7 @@ if [ "$pagMode" == "lite" ]; then strip ${build_dir}/bin/${serverName} strip ${build_dir}/bin/${clientName} # lite version doesn't include taosadapter, which will lead to no restful interface - bin_files="${build_dir}/bin/${serverName} ${build_dir}/bin/${clientName} ${script_dir}/remove.sh ${script_dir}/startPre.sh ${build_dir}/bin/taosBenchmark ${build_dir}/bin/tmq_sim" + bin_files="${build_dir}/bin/${serverName} ${build_dir}/bin/${clientName} ${script_dir}/remove.sh ${script_dir}/startPre.sh ${build_dir}/bin/taosBenchmark " taostools_bin_files="" else @@ -78,7 +78,6 @@ else taostools_bin_files=" ${build_dir}/bin/taosdump \ ${build_dir}/bin/taosBenchmark \ - ${build_dir}/bin/tmq_sim \ ${build_dir}/bin/TDinsight.sh \ $tdinsight_caches" diff --git a/packaging/tools/taos.bat b/packaging/tools/taos.bat new file mode 100644 index 0000000000..c3fe625e16 --- /dev/null +++ b/packaging/tools/taos.bat @@ -0,0 +1,6 @@ +@echo off +cd C:\TDengine +if not "%1" == "" ( + %1 --help + @cmd /k +) \ No newline at end of file diff --git a/packaging/tools/tdengine.iss b/packaging/tools/tdengine.iss new file mode 100644 index 0000000000..7310201815 --- /dev/null +++ b/packaging/tools/tdengine.iss @@ -0,0 +1,81 @@ +#define MyAppName "TDengine" +#define MyAppPublisher "taosdata" +#define MyAppURL "http://www.taosdata.com/" +#define MyAppBeforeInstallTxt "windows_before_install.txt" +#define MyAppIco "favicon.ico" +#define MyAppInstallDir "C:\TDengine" +#define MyAppOutputDir "./" +#define MyAppSourceDir "C:\TDengine" +;#define MyAppAllFile "\*" +#define MyAppCfgName "\cfg\*" +#define MyAppDriverName "\driver\*" +#define MyAppConnectorName "\connector\*" +#define MyAppExamplesName "\examples\*" +#define MyAppIncludeName "\include\*" +#define MyAppExeName "\*.exe" +#define MyAppTaosExeName "\taos.bat" +#define MyAppTaosdemoExeName "\taosBenchmark.exe" +#define MyAppDLLName "\driver\taos.dll" +;#define MyAppVersion "3.0" +;#define MyAppInstallName "TDengine" + +[Setup] +VersionInfoVersion={#MyAppVersion} +AppId={{A0F7A93C-79C4-485D-B2B8-F0D03DF42FAB} +AppName={#MyAppName} +AppVersion={#MyAppVersion} +;AppVerName={#MyAppName} {#MyAppVersion} +AppPublisher={#MyAppPublisher} +AppPublisherURL={#MyAppURL} +AppSupportURL={#MyAppURL} +AppUpdatesURL={#MyAppURL} +DefaultDirName={#MyAppInstallDir} +DefaultGroupName={#MyAppName} +DisableProgramGroupPage=yes +InfoBeforeFile={#MyAppBeforeInstallTxt} +OutputDir={#MyAppOutputDir} +OutputBaseFilename={#MyAppInstallName} +SetupIconFile={#MyAppIco} +Compression=lzma +SolidCompression=yes +DisableDirPage=yes +Uninstallable=yes + +[Languages] +Name: "chinesesimp"; MessagesFile: "compiler:Default.isl" +;Name: "english"; MessagesFile: "compiler:Languages\English.isl" + +[Files] +;Source: {#MyAppSourceDir}{#MyAppAllFile}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs +Source: taos.bat; DestDir: "{app}\include"; Flags: igNoreversion; +;Source: taosdemo.png; DestDir: "{app}\include"; Flags: igNoreversion; +;Source: taosShell.png; DestDir: "{app}\include"; Flags: igNoreversion; +Source: favicon.ico; DestDir: "{app}\include"; Flags: igNoreversion; +Source: {#MyAppSourceDir}{#MyAppDLLName}; DestDir: "{win}\System32"; Flags: igNoreversion; +Source: {#MyAppSourceDir}{#MyAppCfgName}; DestDir: "{app}\cfg"; Flags: igNoreversion recursesubdirs createallsubdirs onlyifdoesntexist uninsneveruninstall +Source: {#MyAppSourceDir}{#MyAppDriverName}; DestDir: "{app}\driver"; Flags: igNoreversion recursesubdirs createallsubdirs +;Source: {#MyAppSourceDir}{#MyAppConnectorName}; DestDir: "{app}\connector"; Flags: igNoreversion recursesubdirs createallsubdirs +;Source: {#MyAppSourceDir}{#MyAppExamplesName}; DestDir: "{app}\examples"; Flags: igNoreversion recursesubdirs createallsubdirs +Source: {#MyAppSourceDir}{#MyAppIncludeName}; DestDir: "{app}\include"; Flags: igNoreversion recursesubdirs createallsubdirs +Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExcludeSource} ; Flags: igNoreversion recursesubdirs createallsubdirs +Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs + +[UninstallDelete] +Name: {app}\driver; Type: filesandordirs +Name: {app}\connector; Type: filesandordirs +Name: {app}\examples; Type: filesandordirs +Name: {app}\include; Type: filesandordirs + +[Tasks] +Name: "desktopicon";Description: "{cm:CreateDesktopIcon}"; GroupDescription:"{cm:AdditionalIcons}"; Flags: checkablealone + +[Icons] +Name:"{group}\Taos Shell"; Filename: "{app}\include\{#MyAppTaosExeName}" ; Parameters: "taos.exe" ; IconFilename: "{app}\include\{#MyAppIco}" +Name:"{group}\Open TDengine Directory"; Filename: "{app}\" +Name:"{group}\Taosdemo"; Filename: "{app}\include\{#MyAppTaosExeName}" ; Parameters: "taosdemo.exe" ; IconFilename: "{app}\include\{#MyAppIco}" +Name: "{group}\{cm:UninstallProgram,{#MyAppName}}"; Filename: "{uninstallexe}" ; IconFilename: "{app}\include\{#MyAppIco}" +Name:"{commondesktop}\Taos Shell"; Filename: "{app}\include\{#MyAppTaosExeName}" ; Parameters: "taos.exe" ; Tasks: desktopicon; WorkingDir: "{app}" ; IconFilename: "{app}\include\{#MyAppIco}" + + +[Messages] +ConfirmUninstall=Do you really want to uninstall TDengine from your computer?%n%nPress [Y] to completely delete %1 and all its components;%nPress [N] to keep the software on your computer. diff --git a/packaging/tools/windows_before_install.txt b/packaging/tools/windows_before_install.txt new file mode 100644 index 0000000000..b793a3e801 --- /dev/null +++ b/packaging/tools/windows_before_install.txt @@ -0,0 +1,3 @@ +TDengine is a high-efficient, scalable, high-available distributed time-series database, which makes a lot of optimizations on inserting and querying data, which is far more efficient than normal regular databases. So TDengine can meet the high requirements of IOT and other areas on storing and querying a large amount of data. + +TDengine will be installed under C:\TDengine, users can modify configuration file C:\TDengine\cfg\taos.cfg, set the log file path or other parameters. \ No newline at end of file diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 06bd3f3887..7031a1ebca 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -286,6 +286,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { if (pInst == NULL || NULL == *pInst) { taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); + taosMemoryFree(pMsg->pData); tFreeClientHbBatchRsp(&pRsp); return -1; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f7d45dc6ff..30b0820713 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1007,7 +1007,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pParam); if (code != 0) { tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code); - if (pMsg->pData) taosMemoryFree(pMsg->pData); + if (pMsg->pData) taosMemoryFreeClear(pMsg->pData); if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { @@ -1699,7 +1699,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tmq_list_destroy(lst); - return rsp; + /*return rsp;*/ + return 0; } // TODO: free resources return 0; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index a79082ab23..11faaaad97 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -16,15 +16,15 @@ #include "systable.h" #include "taos.h" #include "tdef.h" -#include "types.h" #include "tgrant.h" +#include "types.h" #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) static const SSysDbTableSchema dnodesSchema[] = { - {.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, @@ -66,7 +66,7 @@ static const SSysDbTableSchema bnodesSchema[] = { }; static const SSysDbTableSchema clusterSchema[] = { - {.name = "id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; @@ -97,7 +97,7 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "wal_seg_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, }; static const SSysDbTableSchema userFuncSchema[] = { @@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)}, -// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, -// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)}, + // {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, + // {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)}, {TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)}, {TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)}, {TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 06c64dcea6..c4da9b5c3d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -604,11 +604,11 @@ typedef struct { int64_t createTime; int64_t updateTime; int32_t version; + int32_t totalLevel; int64_t smaId; // 0 for unused // info int64_t uid; int8_t status; - int8_t isDistributed; // config int8_t igExpired; int8_t trigger; @@ -647,7 +647,6 @@ typedef struct { typedef struct { int64_t uid; int64_t streamId; - int8_t isDistributed; int8_t status; int8_t stage; } SStreamRecoverObj; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index abac0573da..08ce161409 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -23,11 +23,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; - if (tEncodeI8(pEncoder, pObj->isDistributed) < 0) return -1; if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; @@ -69,11 +69,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pObj->isDistributed) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 218f82df18..a24b7ef459 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -307,10 +307,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); - ASSERT(totLevel <= 2); - pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); - pStream->isDistributed = totLevel == 2; + int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans); + ASSERT(planTotLevel <= 2); + pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*)); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; @@ -320,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { bool multiTarget = pDbObj->cfg.numOfVgroups > 1; - if (totLevel == 2 || externalTargetDB || multiTarget) { + if (planTotLevel == 2 || externalTargetDB || multiTarget) { /*if (true) {*/ SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); @@ -338,8 +337,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } } } + pStream->totalLevel = planTotLevel + hasExtraSink; - if (totLevel > 1) { + if (planTotLevel > 1) { SStreamTask* pInnerTask; // inner level { @@ -371,13 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } -#if 0 - SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb); - ASSERT(pDbObj != NULL); - sdbRelease(pSdb, pSourceDb); - pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups; -#endif - if (tsSchedStreamToSnode) { SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { @@ -464,7 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { } } - if (totLevel == 1) { + if (planTotLevel == 1) { SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 902879701c..ba9bb2982f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); -static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq); +/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -55,7 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); - mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq); + /*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/ mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); @@ -540,6 +540,7 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) { return 0; } +#if 0 int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { if (pStream->isDistributed) { int32_t lv = taosArrayGetSize(pStream->tasks); @@ -573,6 +574,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea } return 0; } +#endif int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t lv = taosArrayGetSize(pStream->tasks); @@ -755,6 +757,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +#if 0 static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -817,6 +820,7 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +#endif int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c093b2cd5d..a1dba41c94 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -117,10 +117,9 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; - SHashObj* pushMgr; // consumerId -> STqHandle* - SHashObj* handles; // subKey -> STqHandle - SHashObj* pStreamTasks; // taksId -> SStreamTask - SHashObj* pAlterInfo; // topic -> SAlterCheckInfo + SHashObj* pushMgr; // consumerId -> STqHandle* + SHashObj* handles; // subKey -> STqHandle + SHashObj* pAlterInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; @@ -129,9 +128,7 @@ struct STQ { TTB* pAlterInfoStore; - TDB* pStreamStore; - TTB* pTaskDb; - TTB* pTaskState; + SStreamMeta* pStreamMeta; }; typedef struct { @@ -188,6 +185,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve pOffsetVal->version = ver; } +// tqStream +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 62e37f048e..c1c680fc56 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -62,8 +62,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -76,6 +74,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ASSERT(0); } + pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask); + if (pTq->pStreamMeta == NULL) { + ASSERT(0); + } + return pTq; } @@ -83,18 +86,11 @@ void tqClose(STQ* pTq) { if (pTq) { tqOffsetClose(pTq->pOffsetStore); taosHashCleanup(pTq->handles); - void* pIter = NULL; - while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); - if (pIter == NULL) break; - SStreamTask* pTask = *(SStreamTask**)pIter; - tFreeSStreamTask(pTask); - } - taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pAlterInfo); taosMemoryFree(pTq->path); tqMetaClose(pTq); + streamMetaClose(pTq->pStreamMeta); taosMemoryFree(pTq); } } @@ -672,6 +668,9 @@ FAIL: } int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { + // + return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen); +#if 0 SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1; @@ -695,6 +694,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { FAIL: if (pTask) taosMemoryFree(pTask); return -1; +#endif } int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { @@ -710,7 +710,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { } while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; @@ -744,9 +744,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { // SStreamTaskRunReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessRunReq(*ppTask); + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessRunReq(pTask); return 0; } else { return -1; @@ -762,14 +762,15 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); - int32_t taskId = req.taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { + int32_t taskId = req.taskId; + + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(*ppTask, &req, &rsp, exec); + streamProcessDispatchReq(pTask, &req, &rsp, exec); return 0; } else { return -1; @@ -779,9 +780,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverReq* pReq = pMsg->pCont; int32_t taskId = pReq->taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessRecoverReq(*ppTask, pReq, pMsg); + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessRecoverReq(pTask, pReq, pMsg); return 0; } else { return -1; @@ -791,9 +792,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessDispatchRsp(*ppTask, pRsp); + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessDispatchRsp(pTask, pRsp); return 0; } else { return -1; @@ -803,9 +804,10 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRecoverRsp* pRsp = pMsg->pCont; int32_t taskId = pRsp->rspTaskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - streamProcessRecoverRsp(*ppTask, pRsp); + + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { + streamProcessRecoverRsp(pTask, pRsp); return 0; } else { return -1; @@ -815,18 +817,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - if (ppTask) { - SStreamTask* pTask = *ppTask; - taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); - atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); - } - // todo - // clear queue - // push drop req into queue - // launch exec to free memory - // remove from hash - return 0; + return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); } int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { @@ -837,18 +828,18 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { SDecoder decoder; tDecoderInit(&decoder, msgBody, msgLen); tDecodeStreamRetrieveReq(&decoder, &req); - int32_t taskId = req.dstTaskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { + int32_t taskId = req.dstTaskId; + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessRetrieveReq(*ppTask, &req, &rsp); + streamProcessRetrieveReq(pTask, &req, &rsp); + return 0; } else { return -1; } - return 0; } int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { @@ -871,16 +862,18 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { goto FAIL; } - int32_t taskId = req.taskId; - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - if (ppTask) { + int32_t taskId = req.taskId; + + SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); + if (pTask) { SRpcMsg rsp = { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(*ppTask, &req, &rsp, false); + streamProcessDispatchReq(pTask, &req, &rsp, false); return; } + FAIL: if (pMsg->info.handle == NULL) return; SRpcMsg rsp = { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 1c48ef7535..ae3fef9b4b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -215,7 +215,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) walApplyVer(pTq->pVnode->pWal, ver); if (msgType == TDMT_VND_SUBMIT) { - if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; + if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; void* data = taosMemoryMalloc(msgLen); if (data == NULL) { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5017893853..6ce8dbe5d9 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -413,7 +413,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } while (1) { - pIter = taosHashIterate(pTq->pStreamTasks, pIter); + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index a9f07cbf24..c40fb98d62 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -115,12 +115,19 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow); int64_t version = TSDBROW_VERSION(&row); + tsdbTrace("vgId:%d, vnode snapshot tsdb read for %s, %" PRId64 "(%" PRId64 " , %" PRId64 ")", + TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path, version, pReader->sver, pReader->ever); + if (version < pReader->sver || version > pReader->ever) continue; code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL); if (code) goto _err; } + if (pReader->nBlockData.nRow <= 0) { + continue; + } + // org data // compress data (todo) int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData); @@ -808,7 +815,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { if (code) goto _err; _exit: - tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); + tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), + pWriter->pTsdb->path); return code; _err: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a269f81ddd..50ca81e58b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -722,10 +722,12 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } bool vnodeIsLeader(SVnode *pVnode) { if (!syncIsReady(pVnode->sync)) { + vDebug("vgId:%d, vnode not ready", pVnode->config.vgId); return false; } if (!pVnode->restored) { + vDebug("vgId:%d, vnode not restored", pVnode->config.vgId); terrno = TSDB_CODE_APP_NOT_READY; return false; } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index a9f2d426bc..45f97865ce 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -467,6 +467,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (NULL == taosArrayPush(newBatch.pMsgs, &req)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + msg = NULL; if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -517,6 +518,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT if (NULL == taosArrayPush(pBatch->pMsgs, &req)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + msg = NULL; if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -545,7 +547,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } - tNameGetFullDbName(pName, newBatch.dbFName); + tNameGetFullDbName(pName, pBatch->dbFName); } ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 8e5fb90f1a..1ca60c89cd 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -438,6 +438,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { } } +void ctgFreeTbMetasMsgCtx(SCtgMsgCtx* pCtx) { + ctgFreeMsgCtx(pCtx); + if (pCtx->lastOut) { + ctgFreeSTableMetaOutput((STableMetaOutput*)pCtx->lastOut); + pCtx->lastOut = NULL; + } +} + void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput) { if (NULL == pOutput) { return; @@ -641,7 +649,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { taosArrayDestroy(taskCtx->pFetchs); // NO NEED TO FREE pNames - taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx); + taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeTbMetasMsgCtx); if (pTask->msgCtx.lastOut) { ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9971f20d3d..5c49a6e0ab 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2515,7 +2515,7 @@ static bool isPartitionByTbname(SNodeList* pPartitionByList) { return false; } SNode* pPartKey = nodesListGetNode(pPartitionByList, 0); - return QUERY_NODE_FUNCTION != nodeType(pPartKey) || FUNCTION_TYPE_TBNAME != ((SFunctionNode*)pPartKey)->funcType; + return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType; } static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) { @@ -2566,7 +2566,6 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { if (NULL == pSelect->pWindow) { return TSDB_CODE_SUCCESS; } - pSelect->isTimeLineResult = true; pCxt->currClause = SQL_CLAUSE_WINDOW; int32_t code = translateExpr(pCxt, &pSelect->pWindow); if (TSDB_CODE_SUCCESS == code) { @@ -2637,7 +2636,6 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelec if (NULL == pSelect->pPartitionByList) { return TSDB_CODE_SUCCESS; } - pSelect->isTimeLineResult = false; pCxt->currClause = SQL_CLAUSE_PARTITION_BY; return translateExprList(pCxt, pSelect->pPartitionByList); } @@ -4708,6 +4706,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq); } +static bool crossTableWithoutAggOper(SSelectStmt* pSelect) { + return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && + !pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && + !isPartitionByTbname(pSelect->pPartitionByList); +} + static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { if (NULL != pStmt->pOptions->pWatermark && (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) { @@ -4723,14 +4727,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt return TSDB_CODE_SUCCESS; } - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { - SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; - if (QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) { - return TSDB_CODE_SUCCESS; - } + if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) || + QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return TSDB_CODE_SUCCESS; } static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) { @@ -4759,12 +4761,23 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) { return code; } +static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || + !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + } + return TSDB_CODE_SUCCESS; +} + static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) { pCxt->createStream = true; int32_t code = addWstartTsToCreateStreamQuery(pStmt); if (TSDB_CODE_SUCCESS == code) { code = translateQuery(pCxt, pStmt); } + if (TSDB_CODE_SUCCESS == code) { + code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt); + } if (TSDB_CODE_SUCCESS == code) { getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB); code = nodesNodeToString(pStmt, false, &pReq->ast, NULL); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e10f0586ca..97977878ad 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -268,7 +268,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode); case QUERY_NODE_LOGIC_PLAN_PARTITION: - return stbSplIsMultiTbScanChild(streamQuery, pNode); + return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index 33e864158a..ceddf4f215 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -8,7 +8,8 @@ target_include_directories( target_link_libraries( stream - PRIVATE os util transport qcom executor tdb + PUBLIC tdb + PRIVATE os util transport qcom executor ) if(${BUILD_TEST}) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7dfeefb261..8faa22d643 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -14,22 +14,8 @@ */ #include "executor.h" -#include "tdbInt.h" #include "tstream.h" -typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask); - -typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pStateDb; - SHashObj* pTasks; - void* ahandle; - TXN txn; - FTaskExpand* expandFunc; -} SStreamMeta; - SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -50,8 +36,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } + pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pMeta->pTasks == NULL) { + goto _err; + } + + if (streamMetaBegin(pMeta) < 0) { + goto _err; + } + pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; + return pMeta; _err: return NULL; @@ -62,6 +58,48 @@ void streamMetaClose(SStreamMeta* pMeta) { tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pStateDb); tdbClose(pMeta->db); + + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) break; + SStreamTask* pTask = *(SStreamTask**)pIter; + tFreeSStreamTask(pTask); + } + taosHashCleanup(pMeta->pTasks); + taosMemoryFree(pMeta->path); + taosMemoryFree(pMeta); +} + +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) { + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return -1; + } + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + if (tDecodeSStreamTask(&decoder, pTask) < 0) { + ASSERT(0); + goto FAIL; + } + tDecoderClear(&decoder); + + if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { + ASSERT(0); + goto FAIL; + } + + taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + + if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) { + ASSERT(0); + return -1; + } + return 0; + +FAIL: + if (pTask) taosMemoryFree(pTask); + return -1; } int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { @@ -94,6 +132,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } +SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); + if (ppTask) { + ASSERT((*ppTask)->taskId == taskId); + return *ppTask; + } else { + return NULL; + } +} + int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { @@ -150,7 +198,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { return 0; } -int32_t streamRestoreTask(SStreamMeta* pMeta) { +int32_t streamLoadTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { ASSERT(0); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3530c05688..2a77ce8a91 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -87,53 +87,80 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp return 0; } -int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) { -#if 0 - if (pTask->taskStatus != TASK_STATUS__FAIL) { - return 0; - } +typedef struct { + int32_t vgId; + int32_t childId; + int64_t ver; +} SStreamVgVerCheckpoint; - if (pTask->isStreamDistributed) { - if (pTask->taskType == TASK_TYPE__SOURCE) { - pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; - } else if (pTask->taskType != TASK_TYPE__SINK) { - pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; - bool hasCheckpoint = false; - int32_t childSz = taosArrayGetSize(pTask->childEpInfo); - for (int32_t i = 0; i < childSz; i++) { - SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); - if (pEpInfo->checkpointVer == -1) { - hasCheckpoint = true; - break; - } - } - if (hasCheckpoint) { - // load from checkpoint - } else { - // recover child - } - } - } else { - if (pTask->taskType == TASK_TYPE__SOURCE) { - if (pTask->checkpointVer != -1) { - // load from checkpoint - } else { - // reset stream query task info - // TODO get snapshot ver - pTask->recoverSnapVer = -1; - qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); - pTask->taskStatus = TASK_STATUS__RECOVERING; - } - } - } - - if (pTask->taskStatus == TASK_STATUS__RECOVERING) { - if (streamPipelineExec(pTask, 100) < 0) { - // set fail - return -1; - } - } - -#endif +int32_t tEncodeSStreamVgVerCheckpoint(SEncoder* pEncoder, const SStreamVgVerCheckpoint* pCheckpoint) { + if (tEncodeI32(pEncoder, pCheckpoint->vgId) < 0) return -1; + if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1; + if (tEncodeI64(pEncoder, pCheckpoint->ver) < 0) return -1; + return 0; +} + +int32_t tDecodeSStreamVgVerCheckpoint(SDecoder* pDecoder, SStreamVgVerCheckpoint* pCheckpoint) { + if (tDecodeI32(pDecoder, &pCheckpoint->vgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1; + if (tDecodeI64(pDecoder, &pCheckpoint->ver) < 0) return -1; + return 0; +} + +typedef struct { + int64_t streamId; + int64_t checkTs; + int64_t checkpointId; + int32_t taskId; + SArray* checkpointVer; // SArray +} SStreamAggVerCheckpoint; + +int32_t tEncodeSStreamAggVerCheckpoint(SEncoder* pEncoder, const SStreamAggVerCheckpoint* pCheckpoint) { + if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1; + if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1; + if (tEncodeI64(pEncoder, pCheckpoint->checkpointId) < 0) return -1; + if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1; + int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer); + if (tEncodeI32(pEncoder, sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SStreamVgVerCheckpoint* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i); + if (tEncodeSStreamVgVerCheckpoint(pEncoder, pOneVgCkpoint) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSStreamAggVerCheckpoint(SDecoder* pDecoder, SStreamAggVerCheckpoint* pCheckpoint) { + if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1; + if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1; + if (tDecodeI64(pDecoder, &pCheckpoint->checkpointId) < 0) return -1; + if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1; + int32_t sz; + if (tDecodeI32(pDecoder, &sz) < 0) return -1; + for (int32_t i = 0; i < sz; i++) { + SStreamVgVerCheckpoint oneVgCheckpoint; + if (tDecodeSStreamVgVerCheckpoint(pDecoder, &oneVgCheckpoint) < 0) return -1; + taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint); + } + return 0; +} + +int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) { + ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); + // load status + void* pVal = NULL; + int32_t vLen = 0; + if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { + return -1; + } + SDecoder decoder; + tDecoderInit(&decoder, pVal, vLen); + SStreamAggVerCheckpoint aggCheckpoint; + tDecodeSStreamAggVerCheckpoint(&decoder, &aggCheckpoint); + /*pTask->*/ + return 0; +} + +int32_t streamRecoverTask(SStreamTask* pTask) { + // return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 3a54981989..8b5bd849f6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -52,6 +52,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; @@ -62,7 +63,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; - /*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/ int32_t epSz = taosArrayGetSize(pTask->childEpInfo); if (tEncodeI32(pEncoder, epSz) < 0) return -1; @@ -101,6 +101,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { /*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; @@ -111,7 +112,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; - /*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/ int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c17d91182e..3f9aa6c080 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2668,7 +2668,22 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p syncNodeEventLog(ths, "I am not follower, can not do leader transfer"); return 0; } - syncNodeEventLog(ths, "do leader transfer"); + + if (!ths->restoreFinish) { + syncNodeEventLog(ths, "restore not finish, can not do leader transfer"); + return 0; + } + + if (ths->vgId > 1) { + syncNodeEventLog(ths, "I am vnode, can not do leader transfer"); + return 0; + } + + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index); + syncNodeEventLog(ths, logBuf); + } while (0); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && diff --git a/source/libs/transport/src/.transSvr.c.swo b/source/libs/transport/src/.transSvr.c.swo deleted file mode 100644 index c9486c5086..0000000000 Binary files a/source/libs/transport/src/.transSvr.c.swo and /dev/null differ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dc7ffa9b13..e1df181329 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -482,6 +482,7 @@ void cliReadTimeoutCb(uv_timer_t* handle) { // set up timeout cb SCliConn* conn = handle->data; tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + uv_read_stop(conn->stream); cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); } @@ -993,6 +994,8 @@ static void cliAsyncCb(uv_async_t* handle) { if (count >= 2) { tTrace("cli process batch size:%d", count); } + // if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb); + if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); } static void cliPrepareCb(uv_prepare_t* handle) { @@ -1088,7 +1091,7 @@ static SCliThrd* createThrdObj() { pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); pThrd->prepare->data = pThrd; - uv_prepare_start(pThrd->prepare, cliPrepareCb); + // uv_prepare_start(pThrd->prepare, cliPrepareCb); int32_t timerSize = 512; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); @@ -1125,7 +1128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(timer); } taosArrayDestroy(pThrd->timerList); - taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); diff --git a/source/os/src/osAtomic.c b/source/os/src/osAtomic.c index d1d768fbcc..a9eb23abba 100644 --- a/source/os/src/osAtomic.c +++ b/source/os/src/osAtomic.c @@ -193,7 +193,13 @@ void* interlocked_sub_fetch_ptr(void* volatile* ptr, void* val) { } int32_t interlocked_fetch_sub_32(int32_t volatile* ptr, int32_t val) { return _InterlockedExchangeAdd(ptr, -val); } -int64_t interlocked_fetch_sub_64(int64_t volatile* ptr, int64_t val) { return _InterlockedExchangeAdd64(ptr, -val); } +int64_t interlocked_fetch_sub_64(int64_t volatile* ptr, int64_t val) { +#ifdef _TD_WINDOWS_32 + return _InterlockedExchangeAdd((int32_t volatile*)ptr, -(int32_t)val); +#else + return _InterlockedExchangeAdd64(ptr, -val); +#endif +} void* interlocked_fetch_sub_ptr(void* volatile* ptr, void* val) { #ifdef WINDOWS @@ -375,7 +381,11 @@ int32_t atomic_exchange_32(int32_t volatile* ptr, int32_t val) { int64_t atomic_exchange_64(int64_t volatile* ptr, int64_t val) { #ifdef WINDOWS +#ifdef _TD_WINDOWS_32 + return _InterlockedExchange((int32_t volatile*)(ptr), (int32_t)(val)); +#else return _InterlockedExchange64((int64_t volatile*)(ptr), (int64_t)(val)); +#endif #elif defined(_TD_NINGSI_60) return atomic_exchange_64_impl((int64_t*)ptr, (int64_t)val); #else @@ -529,7 +539,11 @@ int32_t atomic_fetch_add_32(int32_t volatile* ptr, int32_t val) { int64_t atomic_fetch_add_64(int64_t volatile* ptr, int64_t val) { #ifdef WINDOWS +#ifdef _TD_WINDOWS_32 + return _InterlockedExchangeAdd((int32_t volatile*)(ptr), (int32_t)(val)); +#else return _InterlockedExchangeAdd64((int64_t volatile*)(ptr), (int64_t)(val)); +#endif #elif defined(_TD_NINGSI_60) return __sync_fetch_and_add((ptr), (val)); #else @@ -631,7 +645,11 @@ int32_t atomic_fetch_sub_32(int32_t volatile* ptr, int32_t val) { int64_t atomic_fetch_sub_64(int64_t volatile* ptr, int64_t val) { #ifdef WINDOWS +#ifdef _TD_WINDOWS_32 + return _InterlockedExchangeAdd((int32_t volatile*)(ptr), -(int32_t)(val)); +#else return _InterlockedExchangeAdd64((int64_t volatile*)(ptr), -(int64_t)(val)); +#endif #elif defined(_TD_NINGSI_60) return __sync_fetch_and_sub((ptr), (val)); #else diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 26aafd743f..db3aaa49a6 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -48,7 +48,7 @@ char *strsep(char **stringp, const char *delim) { /* NOTREACHED */ } /* Duplicate a string, up to at most size characters */ -char *strndup(const char *s, size_t size) { +char *strndup(const char *s, int size) { size_t l; char * s2; l = strlen(s); @@ -62,7 +62,7 @@ char *strndup(const char *s, size_t size) { } /* Copy no more than N characters of SRC to DEST, returning the address of the terminating '\0' in DEST, if any, or else DEST + N. */ -char *stpncpy(char *dest, const char *src, size_t n) { +char *stpncpy(char *dest, const char *src, int n) { size_t size = strnlen(src, n); memcpy(dest, src, size); dest += size; diff --git a/tests/script/tsim/stream/drop_stream.sim b/tests/script/tsim/stream/drop_stream.sim index bdd88bf780..747f59fe85 100644 --- a/tests/script/tsim/stream/drop_stream.sim +++ b/tests/script/tsim/stream/drop_stream.sim @@ -45,70 +45,70 @@ sql create table test.scalar_function_tb1 (ts timestamp, c1 tinyint, c2 smallint sql create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)) tags (t1 int); sql create table scalar_ct1 using scalar_stb tags(10); sql create table if not exists scalar_tb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)); -sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb; +sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_abs_stream trigger at_once into output_abs_ctb as select ts, abs(c1), abs(c2), c3 from scalar_ct1; sql create stream tb_abs_stream trigger at_once into output_abs_tb as select ts, abs(c1), abs(c2), c3 from scalar_tb; -sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb; +sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_acos_stream trigger at_once into output_acos_ctb as select ts, acos(c1), acos(c2), c3 from scalar_ct1; sql create stream tb_acos_stream trigger at_once into output_acos_tb as select ts, acos(c1), acos(c2), c3 from scalar_tb; -sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb; +sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_asin_stream trigger at_once into output_asin_ctb as select ts, asin(c1), asin(c2), c3 from scalar_ct1; sql create stream tb_asin_stream trigger at_once into output_asin_tb as select ts, asin(c1), asin(c2), c3 from scalar_tb; -sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb; +sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1; sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb; -sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb; +sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1; sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb; -sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb; +sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1; sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb; -sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb; +sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1; sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb; -sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb; +sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb partition by tbname; sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1; sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb; -sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb; +sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb partition by tbname; sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1; sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb; -sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb; +sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1; sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb; -sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb; +sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1; sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb; -sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb; +sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1; sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb; -sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb; +sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1; sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb; -sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb; +sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb partition by tbname; sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1; sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb; -sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb; +sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb partition by tbname; sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1; sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb; -sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb; +sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb partition by tbname; sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1; sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb; -sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb; +sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb partition by tbname; sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1; sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb; -sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb; +sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb partition by tbname; sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1; sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb; -sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb; +sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb partition by tbname; sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1; sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb; -sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb; +sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb partition by tbname; sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1; sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb; -sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb; +sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb partition by tbname; sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1; sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb; -sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb; +sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb partition by tbname; sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1; sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb; sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos"); @@ -136,70 +136,70 @@ sql create table test.scalar_function_tb1 (ts timestamp, c1 tinyint, c2 smallint sql create table if not exists scalar_stb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)) tags (t1 int); sql create table scalar_ct1 using scalar_stb tags(10); sql create table if not exists scalar_tb (ts timestamp, c1 int, c2 double, c3 binary(20), c4 binary(20), c5 nchar(20)); -sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb; +sql create stream stb_abs_stream trigger at_once into output_abs_stb as select ts, abs(c1), abs(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_abs_stream trigger at_once into output_abs_ctb as select ts, abs(c1), abs(c2), c3 from scalar_ct1; sql create stream tb_abs_stream trigger at_once into output_abs_tb as select ts, abs(c1), abs(c2), c3 from scalar_tb; -sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb; +sql create stream stb_acos_stream trigger at_once into output_acos_stb as select ts, acos(c1), acos(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_acos_stream trigger at_once into output_acos_ctb as select ts, acos(c1), acos(c2), c3 from scalar_ct1; sql create stream tb_acos_stream trigger at_once into output_acos_tb as select ts, acos(c1), acos(c2), c3 from scalar_tb; -sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb; +sql create stream stb_asin_stream trigger at_once into output_asin_stb as select ts, asin(c1), asin(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_asin_stream trigger at_once into output_asin_ctb as select ts, asin(c1), asin(c2), c3 from scalar_ct1; sql create stream tb_asin_stream trigger at_once into output_asin_tb as select ts, asin(c1), asin(c2), c3 from scalar_tb; -sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb; +sql create stream stb_atan_stream trigger at_once into output_atan_stb as select ts, atan(c1), atan(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_atan_stream trigger at_once into output_atan_ctb as select ts, atan(c1), atan(c2), c3 from scalar_ct1; sql create stream tb_atan_stream trigger at_once into output_atan_tb as select ts, atan(c1), atan(c2), c3 from scalar_tb; -sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb; +sql create stream stb_ceil_stream trigger at_once into output_ceil_stb as select ts, ceil(c1), ceil(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_ceil_stream trigger at_once into output_ceil_ctb as select ts, ceil(c1), ceil(c2), c3 from scalar_ct1; sql create stream tb_ceil_stream trigger at_once into output_ceil_tb as select ts, ceil(c1), ceil(c2), c3 from scalar_tb; -sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb; +sql create stream stb_cos_stream trigger at_once into output_cos_stb as select ts, cos(c1), cos(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_cos_stream trigger at_once into output_cos_ctb as select ts, cos(c1), cos(c2), c3 from scalar_ct1; sql create stream tb_cos_stream trigger at_once into output_cos_tb as select ts, cos(c1), cos(c2), c3 from scalar_tb; -sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb; +sql create stream stb_floor_stream trigger at_once into output_floor_stb as select ts, floor(c1), floor(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_floor_stream trigger at_once into output_floor_ctb as select ts, floor(c1), floor(c2), c3 from scalar_ct1; sql create stream tb_floor_stream trigger at_once into output_floor_tb as select ts, floor(c1), floor(c2), c3 from scalar_tb; -sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb; +sql create stream stb_log_stream trigger at_once into output_log_stb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_stb partition by tbname; sql create stream ctb_log_stream trigger at_once into output_log_ctb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_ct1; sql create stream tb_log_stream trigger at_once into output_log_tb as select ts, log(c1, 2), log(c2, 2), c3 from scalar_tb; -sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb; +sql create stream stb_pow_stream trigger at_once into output_pow_stb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_stb partition by tbname; sql create stream ctb_pow_stream trigger at_once into output_pow_ctb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_ct1; sql create stream tb_pow_stream trigger at_once into output_pow_tb as select ts, pow(c1, 2), pow(c2, 2), c3 from scalar_tb; -sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb; +sql create stream stb_round_stream trigger at_once into output_round_stb as select ts, round(c1), round(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_round_stream trigger at_once into output_round_ctb as select ts, round(c1), round(c2), c3 from scalar_ct1; sql create stream tb_round_stream trigger at_once into output_round_tb as select ts, round(c1), round(c2), c3 from scalar_tb; -sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb; +sql create stream stb_sin_stream trigger at_once into output_sin_stb as select ts, sin(c1), sin(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_sin_stream trigger at_once into output_sin_ctb as select ts, sin(c1), sin(c2), c3 from scalar_ct1; sql create stream tb_sin_stream trigger at_once into output_sin_tb as select ts, sin(c1), sin(c2), c3 from scalar_tb; -sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb; +sql create stream stb_sqrt_stream trigger at_once into output_sqrt_stb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_sqrt_stream trigger at_once into output_sqrt_ctb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_ct1; sql create stream tb_sqrt_stream trigger at_once into output_sqrt_tb as select ts, sqrt(c1), sqrt(c2), c3 from scalar_tb; -sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb; +sql create stream stb_tan_stream trigger at_once into output_tan_stb as select ts, tan(c1), tan(c2), c3 from scalar_stb partition by tbname; sql create stream ctb_tan_stream trigger at_once into output_tan_ctb as select ts, tan(c1), tan(c2), c3 from scalar_ct1; sql create stream tb_tan_stream trigger at_once into output_tan_tb as select ts, tan(c1), tan(c2), c3 from scalar_tb; -sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb; +sql create stream stb_char_length_stream into output_char_length_stb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_stb partition by tbname; sql create stream ctb_char_length_stream into output_char_length_ctb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_ct1; sql create stream tb_char_length_stream into output_char_length_tb as select ts, char_length(c3), char_length(c4), char_length(c5) from scalar_tb; -sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb; +sql create stream stb_concat_stream into output_concat_stb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_stb partition by tbname; sql create stream ctb_concat_stream into output_concat_ctb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_ct1; sql create stream tb_concat_stream into output_concat_tb as select ts, concat(c3, c4), concat(c3, c5), concat(c4, c5), concat(c3, c4, c5) from scalar_tb; -sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb; +sql create stream stb_concat_ws_stream into output_concat_ws_stb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_stb partition by tbname; sql create stream ctb_concat_ws_stream into output_concat_ws_ctb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_ct1; sql create stream tb_concat_ws_stream into output_concat_ws_tb as select ts, concat_ws("aND", c3, c4), concat_ws("and", c3, c5), concat_ws("And", c4, c5), concat_ws("AND", c3, c4, c5) from scalar_tb; -sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb; +sql create stream stb_length_stream into output_length_stb as select ts, length(c3), length(c4), length(c5) from scalar_stb partition by tbname; sql create stream ctb_length_stream into output_length_ctb as select ts, length(c3), length(c4), length(c5) from scalar_ct1; sql create stream tb_length_stream into output_length_tb as select ts, length(c3), length(c4), length(c5) from scalar_tb; -sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb; +sql create stream stb_lower_stream into output_lower_stb as select ts, lower(c3), lower(c4), lower(c5) from scalar_stb partition by tbname; sql create stream ctb_lower_stream into output_lower_ctb as select ts, lower(c3), lower(c4), lower(c5) from scalar_ct1; sql create stream tb_lower_stream into output_lower_tb as select ts, lower(c3), lower(c4), lower(c5) from scalar_tb; -sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb; +sql create stream stb_ltrim_stream into output_ltrim_stb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_stb partition by tbname; sql create stream ctb_ltrim_stream into output_ltrim_ctb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_ct1; sql create stream tb_ltrim_stream into output_ltrim_tb as select ts, ltrim(c3), ltrim(c4), ltrim(c5) from scalar_tb; -sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb; +sql create stream stb_rtrim_stream into output_rtrim_stb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_stb partition by tbname; sql create stream ctb_rtrim_stream into output_rtrim_ctb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_ct1; sql create stream tb_rtrim_stream into output_rtrim_tb as select ts, rtrim(c3), rtrim(c4), rtrim(c5) from scalar_tb; -sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb; +sql create stream stb_substr_stream into output_substr_stb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_stb partition by tbname; sql create stream ctb_substr_stream into output_substr_ctb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_ct1; sql create stream tb_substr_stream into output_substr_tb as select ts, substr(c3, 2), substr(c3, 2, 2), substr(c4, 5, 1), substr(c5, 3, 4) from scalar_tb; -sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb; +sql create stream stb_upper_stream into output_upper_stb as select ts, upper(c3), upper(c4), upper(c5) from scalar_stb partition by tbname; sql create stream ctb_upper_stream into output_upper_ctb as select ts, upper(c3), upper(c4), upper(c5) from scalar_ct1; sql create stream tb_upper_stream into output_upper_tb as select ts, upper(c3), upper(c4), upper(c5) from scalar_tb; sql insert into scalar_ct1 values (1656668180503, 100, 100.1, "beijing", "taos", "Taos");