diff --git a/.clang-format b/.clang-format index e58d518b3b..56ca83e724 100644 --- a/.clang-format +++ b/.clang-format @@ -88,4 +88,3 @@ Standard: Auto TabWidth: 8 UseTab: Never ... - diff --git a/.gitignore b/.gitignore index 76b581b182..5f1e24109d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ build/ compile_commands.json +CMakeSettings.json .cache .ycm_extra_conf.py .tasks diff --git a/CMakeSettings.json b/CMakeSettings.json deleted file mode 100644 index d3f2c27bf6..0000000000 --- a/CMakeSettings.json +++ /dev/null @@ -1,25 +0,0 @@ -{ - "configurations": [ - { - "name": "WSL-GCC-Debug", - "generator": "Unix Makefiles", - "configurationType": "Debug", - "buildRoot": "${projectDir}\\build\\", - "installRoot": "${projectDir}\\build\\", - "cmakeExecutable": "/usr/bin/cmake", - "cmakeCommandArgs": "", - "buildCommandArgs": "", - "ctestCommandArgs": "", - "inheritEnvironments": [ "linux_x64" ], - "wslPath": "${defaultWSLPath}", - "addressSanitizerRuntimeFlags": "detect_leaks=0", - "variables": [ - { - "name": "CMAKE_INSTALL_PREFIX", - "value": "/mnt/d/TDengine/TDengine/build", - "type": "PATH" - } - ] - } - ] -} \ No newline at end of file diff --git a/TDenginelogo.png b/TDenginelogo.png deleted file mode 100644 index 19a92592d7..0000000000 Binary files a/TDenginelogo.png and /dev/null differ diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index fc95033baa..d92a93fd4f 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -7,7 +7,7 @@ description: "taospy is the official Python connector for TDengine. taospy provi import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; -`taospy is the official Python connector for TDengine. taospy provides a rich API that makes it easy for Python applications to use TDengine. `taospy` wraps both the [native interface](/reference/connector/cpp) and [REST interface](/reference/rest-api) of TDengine, which correspond to the `taos` and `taosrest` modules of the `taospy` package, respectively. +`taospy` is the official Python connector for TDengine. taospy provides a rich API that makes it easy for Python applications to use TDengine. `taospy` wraps both the [native interface](/reference/connector/cpp) and [REST interface](/reference/rest-api) of TDengine, which correspond to the `taos` and `taosrest` modules of the `taospy` package, respectively. In addition to wrapping the native and REST interfaces, `taospy` also provides a set of programming interfaces that conforms to the [Python Data Access Specification (PEP 249)](https://peps.python.org/pep-0249/). It is easy to integrate `taospy` with many third-party tools, such as [SQLAlchemy](https://www.sqlalchemy.org/) and [pandas](https://pandas.pydata.org/). The direct connection to the server using the native interface provided by the client driver is referred to hereinafter as a "native connection"; the connection to the server using the REST interface provided by taosAdapter is referred to hereinafter as a "REST connection". diff --git a/docs/en/14-reference/03-connector/_preparition.mdx b/docs/en/14-reference/03-connector/_preparition.mdx deleted file mode 100644 index 87538ebfd8..0000000000 --- a/docs/en/14-reference/03-connector/_preparition.mdx +++ /dev/null @@ -1,10 +0,0 @@ -- 已安装客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装) - -:::info - -由于 TDengine 的客户端驱动使用 C 语言编写,使用原生连接时需要加载系统对应安装在本地的客户端驱动共享库文件,通常包含在 TDengine 安装包。TDengine Linux 服务端安装包附带了 TDengine 客户端,也可以单独安装 [Linux 客户端](/get-started/) 。在 Windows 环境开发时需要安装 TDengine 对应的 [Windows 客户端](https://www.taosdata.com/cn/all-downloads/#TDengine-Windows-Client) 。 - -- libtaos.so: 在 Linux 系统中成功安装 TDengine 后,依赖的 Linux 版客户端驱动 libtaos.so 文件会被自动拷贝至 /usr/lib/libtaos.so,该目录包含在 Linux 自动扫描路径上,无需单独指定。 -- taos.dll: 在 Windows 系统中安装完客户端之后,依赖的 Windows 版客户端驱动 taos.dll 文件会自动拷贝到系统默认搜索路径 C:/Windows/System32 下,同样无需要单独指定。 - -::: diff --git a/docs/zh/02-intro.md b/docs/zh/02-intro.md index 012c49d2c3..9a0a6fb547 100644 --- a/docs/zh/02-intro.md +++ b/docs/zh/02-intro.md @@ -26,7 +26,7 @@ TDengine 的主要功能如下: - [Icinga2](../third-party/icinga2) - [TCollector](../third-party/tcollector) - [EMQX](../third-party/emq-broker) - - [HiveMQ](../third-party/hive-mq-broker) ; + - [HiveMQ](../third-party/hive-mq-broker) 2. 查询数据,支持 - [标准 SQL](../taos-sql),含嵌套查询 - [时序数据特色函数](../taos-sql/function/#time-series-extensions) @@ -85,14 +85,14 @@ TDengine 的主要功能如下: ![TDengine Database 技术生态图](eco_system.webp) -
图 1. TDengine技术生态图
+
图 1. TDengine 技术生态图
上图中,左侧是各种数据采集或消息队列,包括 OPC-UA、MQTT、Telegraf、也包括 Kafka,他们的数据将被源源不断的写入到 TDengine。右侧则是可视化、BI 工具、组态软件、应用程序。下侧则是 TDengine 自身提供的命令行程序(CLI)以及可视化管理工具。 ## 典型适用场景 -作为一个高性能、分布式、支持 SQL 的时序数据库(Database),TDengine 的典型适用场景包括但不限于 IoT、工业互联网、车联网、IT 运维、能源、金融证券等领域。需要指出的是,TDengine 是针对时序数据场景设计的专用数据库和专用大数据处理工具,因其充分利用了时序大数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM 等通用型数据。下面本文将对适用场景做更多详细的分析。 +作为一个高性能、分布式、支持 SQL 的时序数据库(Database),TDengine 的典型适用场景包括但不限于 IoT、工业互联网、车联网、IT 运维、能源、金融证券等领域。需要指出的是,TDengine 是针对时序数据场景设计的专用数据库和专用大数据处理工具,因其充分利用了时序大数据的特点,它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM 等通用型数据。下面本文将对适用场景做更多详细的分析。 ### 数据源特点和需求 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d503592361..c8b90a9991 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2070,6 +2070,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc // TDMT_VND_DROP_TABLE ================= typedef struct { char* name; + uint64_t suid; // for tmq in wal format int8_t igNotExists; } SVDropTbReq; @@ -2626,6 +2627,22 @@ typedef struct { }; } STqOffsetVal; +static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { + pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; + pOffsetVal->uid = uid; + pOffsetVal->ts = ts; +} + +static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) { + pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META; + pOffsetVal->uid = uid; +} + +static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { + pOffsetVal->type = TMQ_OFFSET__LOG; + pOffsetVal->version = ver; +} + int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal); int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal); int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal); @@ -2957,6 +2974,7 @@ typedef struct { int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp); +void tDeleteSMqDataRsp(SMqDataRsp* pRsp); typedef struct { SMqRspHead head; diff --git a/packaging/MPtestJenkinsfile b/packaging/MPtestJenkinsfile index 45c8d8abf2..77f642180a 100644 --- a/packaging/MPtestJenkinsfile +++ b/packaging/MPtestJenkinsfile @@ -64,6 +64,11 @@ pipeline { defaultValue:'2.1.2', description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1' ) + string ( + name:'nasPassword', + defaultValue:'password', + description: 'the pasword of the NAS server which has installPackage-192.168.1.131' + ) } environment{ WORK_DIR = '/var/lib/jenkins/workspace' @@ -111,17 +116,17 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' } @@ -134,17 +139,22 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_DEB} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} + python3 checkPackageRuning.py + ''' + sh ''' + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh ${TD_CLIENT_TAR} ${version} ${BASE_TD_CLIENT_TAR} ${baseVersion} client ${nasPassword} python3 checkPackageRuning.py ''' } @@ -157,17 +167,17 @@ pipeline { sync_source("${BRANCH_NAME}") sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' } @@ -179,18 +189,42 @@ pipeline { timeout(time: 30, unit: 'MINUTES'){ sync_source("${BRANCH_NAME}") sh ''' - cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword} python3 checkPackageRuning.py ''' sh ''' cd ${TDENGINE_ROOT_DIR}/packaging - bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server + bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword} + python3 checkPackageRuning.py + ''' + sh ''' + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh ${TD_CLIENT_LITE_TAR} ${version} ${BASE_TD_CLIENT_LITE_TAR} ${baseVersion} client ${nasPassword} + python3 checkPackageRuning.py + ''' + } + } + } + + stage('arm64') { + agent{label 'linux_arm64'} + steps { + timeout(time: 30, unit: 'MINUTES'){ + sync_source("${BRANCH_NAME}") + sh ''' + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh ${TD_SERVER_ARM_TAR} ${version} ${BASE_TD_SERVER_ARM_TAR} ${baseVersion} server ${nasPassword} + python3 checkPackageRuning.py + ''' + sh ''' + cd ${TDENGINE_ROOT_DIR}/packaging + bash testpackage.sh ${TD_CLIENT_ARM_TAR} ${version} ${BASE_TD_CLIENT_ARM_TAR} ${baseVersion} client ${nasPassword} python3 checkPackageRuning.py ''' } diff --git a/packaging/debRpmAutoInstall.sh b/packaging/debRpmAutoInstall.sh new file mode 100755 index 0000000000..1f51378c91 --- /dev/null +++ b/packaging/debRpmAutoInstall.sh @@ -0,0 +1,13 @@ +#!/usr/bin/expect +set packgeName [lindex $argv 0] +set packageSuffix [lindex $argv 1] +set timeout 3 +if { ${packageSuffix} == "deb" } { + spawn dpkg -i ${packgeName} +} elseif { ${packageSuffix} == "rpm"} { + spawn rpm -ivh ${packgeName} +} +expect "*one:" +send "\r" +expect "*skip:" +send "\r" diff --git a/packaging/testpackage.sh b/packaging/testpackage.sh index 758c554178..56da9e59be 100755 --- a/packaging/testpackage.sh +++ b/packaging/testpackage.sh @@ -1,6 +1,6 @@ #!/bin/sh - - +#parameter +scriptDir=$(dirname $(readlink -f $0)) packgeName=$1 version=$2 originPackageName=$3 @@ -9,6 +9,17 @@ testFile=$5 subFile="taos.tar.gz" password=$6 +# Color setting +RED='\033[41;30m' +GREEN='\033[1;32m' +YELLOW='\033[1;33m' +BLUE='\033[1;34m' +GREEN_DARK='\033[0;32m' +YELLOW_DARK='\033[0;33m' +BLUE_DARK='\033[0;34m' +GREEN_UNDERLINE='\033[4;32m' +NC='\033[0m' + if [ ${testFile} = "server" ];then tdPath="TDengine-server-${version}" originTdpPath="TDengine-server-${originversion}" @@ -24,146 +35,211 @@ elif [ ${testFile} = "tools" ];then fi function cmdInstall { -comd=$1 -if command -v ${comd} ;then - echo "${comd} is already installed" +command=$1 +if command -v ${command} ;then + echoColor YD "${command} is already installed" else if command -v apt ;then - apt-get install ${comd} -y + apt-get install ${command} -y elif command -v yum ;then - yum -y install ${comd} - echo "you should install ${comd} manually" + yum -y install ${command} + echoColor YD "you should install ${command} manually" fi fi } +function echoColor { +color=$1 +command=$2 -echo "Uninstall all components of TDeingne" - -if command -v rmtaos ;then - echo "uninstall all components of TDeingne:rmtaos" - rmtaos -else - echo "os doesn't include TDengine " +if [ ${color} = 'Y' ];then + echo -e "${YELLOW}${command}${NC}" +elif [ ${color} = 'YD' ];then + echo -e "${YELLOW_DARK}${command}${NC}" +elif [ ${color} = 'R' ];then + echo -e "${RED}${command}${NC}" +elif [ ${color} = 'G' ];then + echo -e "${GREEN}${command}${NC}\r\n" +elif [ ${color} = 'B' ];then + echo -e "${BLUE}${command}${NC}" +elif [ ${color} = 'BD' ];then + echo -e "${BLUE_DARK}${command}${NC}" fi +} -if command -v rmtaostools ;then - echo "uninstall all components of TDeingne:rmtaostools" - rmtaostools -else - echo "os doesn't include rmtaostools " -fi +echoColor G "===== install basesoft =====" cmdInstall tree cmdInstall wget cmdInstall sshpass -echo "new workroom path" +echoColor G "===== Uninstall all components of TDeingne =====" + +if command -v rmtaos ;then + echoColor YD "uninstall all components of TDeingne:rmtaos" + rmtaos +else + echoColor YD "os doesn't include TDengine" +fi + +if command -v rmtaostools ;then + echoColor YD "uninstall all components of TDeingne:rmtaostools" + rmtaostools +else + echoColor YD "os doesn't include rmtaostools " +fi + + + + +echoColor G "===== new workroom path =====" installPath="/usr/local/src/packageTest" oriInstallPath="/usr/local/src/packageTest/3.1" if [ ! -d ${installPath} ] ;then + echoColor BD "mkdir -p ${installPath}" mkdir -p ${installPath} else - echo "${installPath} already exists" + echoColor YD "${installPath} already exists" fi +if [ -d ${installPath}/${tdPath} ] ;then + echoColor BD "rm -rf ${installPath}/${tdPath}/*" + rm -rf ${installPath}/${tdPath}/* +fi if [ ! -d ${oriInstallPath} ] ;then + echoColor BD "mkdir -p ${oriInstallPath}" mkdir -p ${oriInstallPath} else - echo "${oriInstallPath} already exists" + echoColor YD "${oriInstallPath} already exists" +fi + +if [ -d ${oriInstallPath}/${originTdpPath} ] ;then + echoColor BD "rm -rf ${oriInstallPath}/${originTdpPath}/*" + rm -rf ${oriInstallPath}/${originTdpPath}/* fi - - -echo "download installPackage" +echoColor G "===== download installPackage =====" # cd ${installPath} # wget https://www.taosdata.com/assets-download/3.0/${packgeName} # cd ${oriInstallPath} # wget https://www.taosdata.com/assets-download/3.0/${originPackageName} cd ${installPath} +cp -r ${scriptDir}/debRpmAutoInstall.sh . + if [ ! -f {packgeName} ];then - sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} . + echoColor BD "sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} ." + sshpass -p ${password} scp -oStrictHostKeyChecking=no -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} . fi -if [ ! -f debAuto.sh ];then - echo '#!/usr/bin/expect ' > debAuto.sh - echo 'set timeout 3 ' >> debAuto.sh - echo 'pset packgeName [lindex $argv 0]' >> debAuto.sh - echo 'spawn dpkg -i ${packgeName}' >> debAuto.sh - echo 'expect "*one:"' >> debAuto.sh - echo 'send "\r"' >> debAuto.sh - echo 'expect "*skip:"' >> debAuto.sh - echo 'send "\r" ' >> debAuto.sh + +packageSuffix=$(echo ${packgeName} | awk -F '.' '{print $NF}') + + +if [ ! -f debRpmAutoInstall.sh ];then + echo '#!/usr/bin/expect ' > debRpmAutoInstall.sh + echo 'set packgeName [lindex $argv 0]' >> debRpmAutoInstall.sh + echo 'set packageSuffix [lindex $argv 1]' >> debRpmAutoInstall.sh + echo 'set timeout 3 ' >> debRpmAutoInstall.sh + echo 'if { ${packageSuffix} == "deb" } {' >> debRpmAutoInstall.sh + echo ' spawn dpkg -i ${packgeName} ' >> debRpmAutoInstall.sh + echo '} elseif { ${packageSuffix} == "rpm"} {' >> debRpmAutoInstall.sh + echo ' spawn rpm -ivh ${packgeName}' >> debRpmAutoInstall.sh + echo '}' >> debRpmAutoInstall.sh + echo 'expect "*one:"' >> debRpmAutoInstall.sh + echo 'send "\r"' >> debRpmAutoInstall.sh + echo 'expect "*skip:"' >> debRpmAutoInstall.sh + echo 'send "\r" ' >> debRpmAutoInstall.sh fi + +echoColor G "===== instal Package =====" + if [[ ${packgeName} =~ "deb" ]];then cd ${installPath} dpkg -r taostools dpkg -r tdengine if [[ ${packgeName} =~ "TDengine" ]];then - echo "./debAuto.sh ${packgeName}" && chmod 755 debAuto.sh && ./debAuto.sh ${packgeName} + echoColor BD "./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packgeName} ${packageSuffix} else - echo "dpkg -i ${packgeName}" && dpkg -i ${packgeName} + echoColor BD "dpkg -i ${packgeName}" && dpkg -i ${packgeName} fi elif [[ ${packgeName} =~ "rpm" ]];then cd ${installPath} - echo "rpm ${packgeName}" && rpm -ivh ${packgeName} --quiet + sudo rpm -e tdengine + sudo rpm -e taostools + if [[ ${packgeName} =~ "TDengine" ]];then + echoColor BD "./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packgeName} ${packageSuffix} + else + echoColor BD "rpm -ivh ${packgeName}" && rpm -ivh ${packgeName} + fi elif [[ ${packgeName} =~ "tar" ]];then + echoColor G "===== check installPackage File of tar =====" cd ${oriInstallPath} if [ ! -f {originPackageName} ];then - sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${originversion}/community${originPackageName} . + echoColor YD "download base installPackage" + echoColor BD "sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${originversion}/community/${originPackageName} ." + sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${originversion}/community/${originPackageName} . fi - echo "tar -xvf ${originPackageName}" && tar -xvf ${originPackageName} - + echoColor YD "unzip the base installation package" + echoColor BD "tar -xf ${originPackageName}" && tar -xf ${originPackageName} cd ${installPath} - echo "tar -xvf ${packgeName}" && tar -xvf ${packgeName} - + echoColor YD "unzip the new installation package" + echoColor BD "tar -xf ${packgeName}" && tar -xf ${packgeName} if [ ${testFile} != "tools" ] ;then - cd ${installPath}/${tdPath} && tar vxf ${subFile} - cd ${oriInstallPath}/${originTdpPath} && tar vxf ${subFile} + cd ${installPath}/${tdPath} && tar xf ${subFile} + cd ${oriInstallPath}/${originTdpPath} && tar xf ${subFile} fi - echo "check installPackage File" - + cd ${oriInstallPath}/${originTdpPath} && tree > ${installPath}/base_${originversion}_checkfile + cd ${installPath}/${tdPath} && tree > ${installPath}/now_${version}_checkfile + cd ${installPath} - - tree ${oriInstallPath}/${originTdpPath} > ${oriInstallPath}/${originPackageName}_checkfile - tree ${installPath}/${tdPath} > ${installPath}/${packgeName}_checkfile - - diff ${installPath}/${packgeName}_checkfile ${oriInstallPath}/${originPackageName}_checkfile > ${installPath}/diffFile.log + diff ${installPath}/base_${originversion}_checkfile ${installPath}/now_${version}_checkfile > ${installPath}/diffFile.log diffNumbers=`cat ${installPath}/diffFile.log |wc -l ` - if [ ${diffNumbers} != 0 ];then - echo "The number and names of files have changed from the previous installation package" - echo `cat ${installPath}/diffFile.log` - exit -1 - fi + if [ ${diffNumbers} != 0 ];then + echoColor R "The number and names of files is different from the previous installation package" + echoColor Y `cat ${installPath}/diffFile.log` + exit -1 + else + echoColor G "The number and names of files are the same as previous installation packages" + fi + echoColor YD "===== install Package of tar =====" cd ${installPath}/${tdPath} if [ ${testFile} = "server" ];then + echoColor BD "bash ${installCmd} -e no " bash ${installCmd} -e no else + echoColor BD "bash ${installCmd} " bash ${installCmd} fi - if [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]] ;then - cd ${installPath} - sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz . - # wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz - tar xvf taosTools-2.1.2-Linux-x64.tar.gz - cd taosTools-2.1.2 && bash install-taostools.sh - elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "deb" ]] ;then - cd ${installPath} - sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.deb . - dpkg -i taosTools-2.1.2-Linux-x64.deb - elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "rpm" ]] ;then - cd ${installPath} - sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.rpm . - rpm -ivh taosTools-2.1.2-Linux-x64.rpm --quiet - fi - fi +cd ${installPath} + +if ([[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]]) || [[ ${packgeName} =~ "client" ]] ;then + echoColor G "===== install taos-tools when package is lite or client =====" + cd ${installPath} + sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz . + # wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz + tar xf taosTools-2.1.2-Linux-x64.tar.gz + cd taosTools-2.1.2 && bash install-taostools.sh +elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "deb" ]] ;then + echoColor G "===== install taos-tools when package is lite or client =====" + cd ${installPath} + sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz . + tar xf taosTools-2.1.2-Linux-x64.tar.gz + cd taosTools-2.1.2 && bash install-taostools.sh +elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "rpm" ]] ;then + echoColor G "===== install taos-tools when package is lite or client =====" + cd ${installPath} + sshpass -p ${password} scp -oStrictHostKeyChecking=no -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz . + tar xf taosTools-2.1.2-Linux-x64.tar.gz + cd taosTools-2.1.2 && bash install-taostools.sh +fi + diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index c5ce2f632c..23dc0c0864 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -356,6 +356,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, goto end; } + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 29d509c27c..f08f54ef4b 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -811,8 +811,19 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } int32_t tmq_unsubscribe(tmq_t* tmq) { + int32_t rsp; + int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); - int32_t rsp = tmq_subscribe(tmq, lst); + while (1) { + rsp = tmq_subscribe(tmq, lst); + if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { + break; + } else { + retryCnt++; + taosMsleep(500); + } + } + tmq_list_destroy(lst); return rsp; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2fc93cc9b5..1921415239 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5141,6 +5141,7 @@ static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) { if (tStartEncode(pCoder) < 0) return -1; if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; + if (tEncodeU64(pCoder, pReq->suid) < 0) return -1; if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1; tEndEncode(pCoder); @@ -5151,6 +5152,7 @@ static int32_t tDecodeSVDropTbReq(SDecoder *pCoder, SVDropTbReq *pReq) { if (tStartDecode(pCoder) < 0) return -1; if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; + if (tDecodeU64(pCoder, &pReq->suid) < 0) return -1; if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1; tEndDecode(pCoder); @@ -5889,6 +5891,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } +void tDeleteSMqDataRsp(SMqDataRsp *pRsp) { + taosArrayDestroy(pRsp->blockDataLen); + taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); +} + int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index a059db6b00..b91b82b72e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -301,7 +301,7 @@ int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn)); + strncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN); rpcInit.localPort = tsServerPort; rpcInit.label = "DND-S"; rpcInit.numOfThreads = tsNumOfRpcThreads; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index ff208eae60..eb072d013d 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -763,8 +763,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl int32_t cols = 0; char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; - tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB); - tNameGetDbName(&n, varDataVal(topicName)); + strcpy(varDataVal(topicName), mndGetDbStr(pTopic->name)); + /*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/ + /*tNameGetDbName(&n, varDataVal(topicName));*/ varDataSetLen(topicName, strlen(varDataVal(topicName))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a97c8ff132..19dd321814 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -67,21 +67,21 @@ typedef struct { // tqExec typedef struct { - char* qmsg; + char* qmsg; } STqExecCol; typedef struct { - int64_t suid; + int64_t suid; } STqExecTb; typedef struct { - SHashObj* pFilterOutTbUid; + SHashObj* pFilterOutTbUid; } STqExecDb; typedef struct { int8_t subType; - STqReader* pExecReader; + STqReader* pExecReader; qTaskInfo_t task; union { STqExecCol execCol; @@ -144,7 +144,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum); // tqExec -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp); +int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); // tqMeta @@ -175,22 +175,6 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); char* tqOffsetBuildFName(const char* path, int32_t ver); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); -static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) { - pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA; - pOffsetVal->uid = uid; - pOffsetVal->ts = ts; -} - -static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) { - pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META; - pOffsetVal->uid = uid; -} - -static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) { - pOffsetVal->type = TMQ_OFFSET__LOG; - pOffsetVal->version = ver; -} - // tqStream int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask); diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 235fb1f941..3c3097bb2f 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -16,17 +16,17 @@ #include "sma.h" #include "tsdb.h" -static int32_t smaEvalDays(SRetention *r, int8_t precision); -static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); +static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration); +static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); static int32_t rsmaRestore(SSma *pSma); -#define SMA_SET_KEEP_CFG(l) \ +#define SMA_SET_KEEP_CFG(v, l) \ do { \ SRetention *r = &pCfg->retentions[l]; \ pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \ pKeepCfg->keep0 = pKeepCfg->keep2; \ pKeepCfg->keep1 = pKeepCfg->keep2; \ - pKeepCfg->days = smaEvalDays(r, pCfg->precision); \ + pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \ } while (0) #define SMA_OPEN_RSMA_IMPL(v, l) \ @@ -38,51 +38,78 @@ static int32_t rsmaRestore(SSma *pSma); } \ break; \ } \ - smaSetKeepCfg(&keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ + smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \ goto _err; \ } \ } while (0) -#define RETENTION_DAYS_SPLIT_RATIO 10 -#define RETENTION_DAYS_SPLIT_MIN 1 -#define RETENTION_DAYS_SPLIT_MAX 30 +/** + * @brief Evaluate days(duration) for rsma level 1/2/3. + * 1) level 1: duration from "create database" + * 2) level 2/3: duration * (freq/freqL1) + * @param pVnode + * @param r + * @param level + * @param precision + * @param duration + * @return int32_t + */ +static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) { + int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE); + int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE); + int32_t days = duration; // min -static int32_t smaEvalDays(SRetention *r, int8_t precision) { - int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY); - int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY); - - int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO; - if (days <= RETENTION_DAYS_SPLIT_MIN) { - days = RETENTION_DAYS_SPLIT_MIN; - if (days < freqDays) { - days = freqDays + 1; - } - } else { - if (days > RETENTION_DAYS_SPLIT_MAX) { - days = RETENTION_DAYS_SPLIT_MAX; - } - if (days < freqDays) { - days = freqDays + 1; - } + if (days < freqDuration) { + days = freqDuration; } - return days * 1440; + + if (days > keepDuration) { + days = keepDuration; + } + + if (level == TSDB_RETENTION_L0) { + goto end; + } + + ASSERT(level >= TSDB_RETENTION_L1 && level <= TSDB_RETENTION_L2); + + freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE); + keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE); + + int32_t nFreqTimes = (r + level)->freq / (r + TSDB_RETENTION_L0)->freq; + days *= (nFreqTimes > 1 ? nFreqTimes : 1); + + if (days > keepDuration) { + days = keepDuration; + } + + if (days > TSDB_MAX_DURATION_PER_FILE) { + days = TSDB_MAX_DURATION_PER_FILE; + } + + if (days < freqDuration) { + days = freqDuration; + } +end: + smaInfo("vgId:%d, evaluated duration for level %" PRIi8 " is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration); + return days; } -int smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) { +int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) { pKeepCfg->precision = pCfg->precision; switch (type) { case TSDB_TYPE_TSMA: ASSERT(0); break; case TSDB_TYPE_RSMA_L0: - SMA_SET_KEEP_CFG(0); + SMA_SET_KEEP_CFG(pVnode, 0); break; case TSDB_TYPE_RSMA_L1: - SMA_SET_KEEP_CFG(1); + SMA_SET_KEEP_CFG(pVnode, 1); break; case TSDB_TYPE_RSMA_L2: - SMA_SET_KEEP_CFG(2); + SMA_SET_KEEP_CFG(pVnode, 2); break; default: ASSERT(0); @@ -148,11 +175,11 @@ int32_t smaClose(SSma *pSma) { /** * @brief rsma env restore - * - * @param pSma - * @param type - * @param committedVer - * @return int32_t + * + * @param pSma + * @param type + * @param committedVer + * @return int32_t */ int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) { ASSERT(VND_IS_RSMA(pSma->pVnode)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f33d8dc2d0..f2063e3067 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1078,9 +1078,6 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTa goto _err; } - SSmaEnv *pRSmaEnv = pSma->pRSmaEnv; - SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv); - SRSmaQTaskInfoIter fIter = {0}; if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { tdRSmaQTaskInfoIterDestroy(&fIter); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 54f764c6b3..eed997b486 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -357,8 +357,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { TD_VID(pTq->pVnode), formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { - if (pReq->useSnapshot){ - if (pHandle->fetchMeta){ + if (pReq->useSnapshot) { + if (pHandle->fetchMeta) { tqOffsetResetToMeta(&fetchOffsetNew, 0); } else { tqOffsetResetToData(&fetchOffsetNew, 0, 0); @@ -373,43 +373,47 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } - goto OVER; + tDeleteSMqDataRsp(&dataRsp); + return code; } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; code = -1; - goto OVER; + tDeleteSMqDataRsp(&dataRsp); + return code; } } } - if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG){ + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG) { SMqMetaRsp metaRsp = {0}; tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew); - if(metaRsp.metaRspLen > 0){ + if (metaRsp.metaRspLen > 0) { if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { code = -1; } - tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, + pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, + metaRsp.rspOffset.version); taosMemoryFree(metaRsp.metaRsp); goto OVER; } - if (dataRsp.blockNum > 0){ + if (dataRsp.blockNum > 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } goto OVER; - }else{ + } else { fetchOffsetNew = dataRsp.rspOffset; } - tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.version); + tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", + consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, + dataRsp.rspOffset.uid, dataRsp.rspOffset.version); } if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) { @@ -426,7 +430,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { consumerEpoch = atomic_load_32(&pHandle->epoch); if (consumerEpoch > reqEpoch) { tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", + ", found new consumer epoch %d, discard req epoch %d", consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); break; } @@ -449,7 +453,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) { + if (tqLogScanExec(pTq, pHandle, pCont, &dataRsp) < 0) { /*ASSERT(0);*/ } // TODO batch optimization: @@ -490,18 +494,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { OVER: if (pCkHead) taosMemoryFree(pCkHead); - // TODO wrap in destroy func - taosArrayDestroy(dataRsp.blockDataLen); - taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree); - - if (dataRsp.withSchema) { - taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); - } - - if (dataRsp.withTbName) { - taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree); - } - + tDeleteSMqDataRsp(&dataRsp); return code; } @@ -629,9 +622,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); - buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext)); - pHandle->execHandle.task = - qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); + buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, + (SSnapContext**)(&handle.sContext)); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index a0b8141cfb..da596d07f9 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -60,6 +60,46 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { return 0; } +int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { + const STqExecHandle* pExec = &pHandle->execHandle; + ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); + + qTaskInfo_t task = pExec->task; + + if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { + tqDebug("prepare scan failed, return"); + if (pOffset->type == TMQ_OFFSET__LOG) { + pRsp->rspOffset = *pOffset; + return 0; + } else { + tqOffsetResetToLog(pOffset, pHandle->snapshotVer); + if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { + tqDebug("prepare scan failed, return"); + pRsp->rspOffset = *pOffset; + return 0; + } + } + } + + int32_t rowCnt = 0; + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + tqDebug("tmq task start to execute"); + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(0); + } + tqDebug("tmq task execute end, get %p", pDataBlock); + + if (pDataBlock) { + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + pRsp->blockNum++; + } + } + + return 0; +} + int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) { const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; @@ -102,18 +142,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* taosArrayPush(pRsp->blockTbName, &tbName); } } - if(pRsp->withSchema){ + if (pRsp->withSchema) { if (pOffset->type == TMQ_OFFSET__LOG) { tqAddBlockSchemaToRsp(pExec, pRsp); - }else{ + } else { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); taosArrayPush(pRsp->blockSchema, &pSW); } } - if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); - }else{ + } else { tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); } pRsp->blockNum++; @@ -125,17 +165,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* } } - if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ - if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), - pHandle->snapshotVer + 1); - tqOffsetResetToLog(pOffset, pHandle->snapshotVer); - qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - continue; - } - }else{ - if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ - if(qStreamExtractPrepareUid(task) != 0){ + if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { + if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + if (qStreamExtractPrepareUid(task) != 0) { continue; } tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), @@ -143,13 +175,13 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* break; } - if (pRsp->blockNum > 0){ + if (pRsp->blockNum > 0) { tqDebug("tmqsnap task exec exited, get data"); break; } SMqMetaRsp* tmp = qStreamExtractMetaMsg(task); - if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){ + if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts); qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META; @@ -173,57 +205,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* return 0; } -#if 0 -int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) { - ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); - qTaskInfo_t task = pExec->execCol.task[workerId]; - - if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) { - ASSERT(0); - } - - int32_t rowCnt = 0; - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); - } - if (pDataBlock == NULL) break; - - ASSERT(pDataBlock->info.rows != 0); - ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0); - - tqAddBlockDataToRsp(pDataBlock, pRsp); - - if (pRsp->withTbName) { - pRsp->withTbName = 0; -#if 0 - int64_t uid; - int64_t ts; - if (qGetStreamScanStatus(task, &uid, &ts) < 0) { - ASSERT(0); - } - tqAddTbNameToRsp(pTq, uid, pRsp); -#endif - } - pRsp->blockNum++; - - rowCnt += pDataBlock->info.rows; - if (rowCnt >= 4096) break; - } - int64_t uid; - int64_t ts; - if (qGetStreamScanStatus(task, &uid, &ts) < 0) { - ASSERT(0); - } - tqOffsetResetToData(&pRsp->rspOffset, uid, ts); - - return 0; -} -#endif - -int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) { +int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) { + STqExecHandle* pExec = &pHandle->execHandle; ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { @@ -268,6 +251,28 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR tqAddBlockSchemaToRsp(pExec, pRsp); pRsp->blockNum++; } +#if 0 + if (pHandle->fetchMeta && pRsp->blockNum) { + SSubmitMsgIter iter = {0}; + tInitSubmitMsgIter(pReq, &iter); + STaosxRsp* pXrsp = (STaosxRsp*)pRsp; + while (1) { + SSubmitBlk* pBlk = NULL; + if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1; + if (pBlk->schemaLen > 0) { + if (pXrsp->createTableNum == 0) { + pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pXrsp->createTableReq = taosArrayInit(0, sizeof(void*)); + } + void* createReq = taosMemoryCalloc(1, pBlk->schemaLen); + memcpy(createReq, pBlk->data, pBlk->schemaLen); + taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen); + taosArrayPush(pXrsp->createTableReq, &createReq); + pXrsp->createTableNum++; + } + } + } +#endif } if (pRsp->blockNum == 0) { diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index a192d1f863..62f8debccb 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -18,12 +18,25 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1; + if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1; if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1; if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1; if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1; if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1; + } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ + int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid); + if (tEncodeI32(pEncoder, size) < 0) return -1; + void *pIter = NULL; + pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); + while(pIter){ + int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL); + if (tEncodeI64(pEncoder, *tbUid) < 0) return -1; + pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter); + } + } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ + if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -32,12 +45,25 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1; + if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1; if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1; if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1; if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1; if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1; + }else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){ + pHandle->execHandle.execDb.pFilterOutTbUid = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + int32_t size = 0; + if (tDecodeI32(pDecoder, &size) < 0) return -1; + for(int32_t i = 0; i < size; i++){ + int64_t tbUid = 0; + if (tDecodeI64(pDecoder, &tbUid) < 0) return -1; + taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); + } + } else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){ + if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1; } tEndDecode(pDecoder); return 0; @@ -267,14 +293,28 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ASSERT(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader); - } else { - + } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - handle.execHandle.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); -// handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); - buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); + handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); + buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); + handle.execHandle.task = + qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); + } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + + SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); + vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList); + tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); + for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); + tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); + } + handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); + tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); + taosArrayDestroy(tbUidList); + + buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext)); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 6e2a6fdb71..375130fa2c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -15,6 +15,162 @@ #include "tq.h" + +bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){ + if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){ + return true; + } + + int16_t msgType = pHead->msgType; + char* body = pHead->body; + int32_t bodyLen = pHead->bodyLen; + + int64_t tbSuid = pHandle->execHandle.execTb.suid; + int64_t realTbSuid = 0; + SDecoder coder; + void* data = POINTER_SHIFT(body, sizeof(SMsgHead)); + int32_t len = bodyLen - sizeof(SMsgHead); + tDecoderInit(&coder, data, len); + + if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) { + SVCreateStbReq req = {0}; + if (tDecodeSVCreateStbReq(&coder, &req) < 0) { + goto end; + } + realTbSuid = req.suid; + } else if (msgType == TDMT_VND_DROP_STB) { + SVDropStbReq req = {0}; + if (tDecodeSVDropStbReq(&coder, &req) < 0) { + goto end; + } + realTbSuid = req.suid; + } else if (msgType == TDMT_VND_CREATE_TABLE) { + SVCreateTbBatchReq req = {0}; + if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { + goto end; + } + + int32_t needRebuild = 0; + SVCreateTbReq* pCreateReq = NULL; + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + needRebuild++; + } + } + if(needRebuild == 0){ + // do nothing + }else if(needRebuild == req.nReqs){ + realTbSuid = tbSuid; + }else{ + realTbSuid = tbSuid; + SVCreateTbBatchReq reqNew = {0}; + reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq)); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){ + reqNew.nReqs++; + taosArrayPush(reqNew.pArray, pCreateReq); + } + } + + int tlen; + int32_t ret = 0; + tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret); + void* buf = taosMemoryMalloc(tlen); + if (NULL == buf) { + taosArrayDestroy(reqNew.pArray); + goto end; + } + SEncoder coderNew = {0}; + tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead)); + tEncodeSVCreateTbBatchReq(&coderNew, &reqNew); + tEncoderClear(&coderNew); + memcpy(pHead->body + sizeof(SMsgHead), buf, tlen); + pHead->bodyLen = tlen + sizeof(SMsgHead); + taosMemoryFree(buf); + taosArrayDestroy(reqNew.pArray); + } + } else if (msgType == TDMT_VND_ALTER_TABLE) { + SVAlterTbReq req = {0}; + + if (tDecodeSVAlterTbReq(&coder, &req) < 0) { + goto end; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, pHandle->execHandle.pExecReader->pVnodeMeta, 0); + + if (metaGetTableEntryByName(&mr, req.tbName) < 0) { + metaReaderClear(&mr); + goto end; + } + realTbSuid = mr.me.ctbEntry.suid; + metaReaderClear(&mr); + } else if (msgType == TDMT_VND_DROP_TABLE) { + SVDropTbBatchReq req = {0}; + + if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) { + goto end; + } + + int32_t needRebuild = 0; + SVDropTbReq* pDropReq = NULL; + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pDropReq = req.pReqs + iReq; + + if(pDropReq->suid == tbSuid){ + needRebuild++; + } + } + if(needRebuild == 0){ + // do nothing + }else if(needRebuild == req.nReqs){ + realTbSuid = tbSuid; + }else{ + realTbSuid = tbSuid; + SVDropTbBatchReq reqNew = {0}; + reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq)); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pDropReq = req.pReqs + iReq; + if(pDropReq->suid == tbSuid){ + reqNew.nReqs++; + taosArrayPush(reqNew.pArray, pDropReq); + } + } + + int tlen; + int32_t ret = 0; + tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret); + void* buf = taosMemoryMalloc(tlen); + if (NULL == buf) { + taosArrayDestroy(reqNew.pArray); + goto end; + } + SEncoder coderNew = {0}; + tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead)); + tEncodeSVDropTbBatchReq(&coderNew, &reqNew); + tEncoderClear(&coderNew); + memcpy(pHead->body + sizeof(SMsgHead), buf, tlen); + pHead->bodyLen = tlen + sizeof(SMsgHead); + taosMemoryFree(buf); + taosArrayDestroy(reqNew.pArray); + } + } else if (msgType == TDMT_VND_DELETE) { + SDeleteRes req = {0}; + if (tDecodeDeleteRes(&coder, &req) < 0) { + goto end; + } + realTbSuid = req.suid; + } else{ + ASSERT(0); + } + + end: + tDecoderClear(&coder); + return tbSuid == realTbSuid; +} + int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) { int32_t code = 0; taosThreadMutexLock(&pHandle->pWalReader->mutex); @@ -53,9 +209,11 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea code = -1; goto END; } - *fetchOffset = offset; - code = 0; - goto END; + if(isValValidForTable(pHandle, pHead)){ + *fetchOffset = offset; + code = 0; + goto END; + } } } code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f0518a72ab..b4e2840330 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -143,6 +143,8 @@ typedef struct { STqOffsetVal prepareStatus; // for tmq STqOffsetVal lastStatus; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta + int64_t snapshotVer; + SSchemaWrapper *schema; char tbName[TSDB_TABLE_NAME_LEN]; SSDataBlock* pullOverBlk; // for streaming @@ -486,24 +488,23 @@ typedef struct SStreamScanInfo { STimeWindowAggSupp twAggSup; SSDataBlock* pUpdateDataRes; // status for tmq - // SSchemaWrapper schema; - SNodeList* pGroupTags; - SNode* pTagCond; - SNode* pTagIndexCond; + SNodeList* pGroupTags; + SNode* pTagCond; + SNode* pTagIndexCond; } SStreamScanInfo; -typedef struct SStreamRawScanInfo{ -// int8_t subType; -// bool withMeta; -// int64_t suid; -// int64_t snapVersion; -// void *metaInfo; -// void *dataInfo; - SVnode* vnode; - SSDataBlock pRes; // result SSDataBlock - STsdbReader* dataReader; - SSnapContext* sContext; -}SStreamRawScanInfo; +typedef struct { + // int8_t subType; + // bool withMeta; + // int64_t suid; + // int64_t snapVersion; + // void *metaInfo; + // void *dataInfo; + SVnode* vnode; + SSDataBlock pRes; // result SSDataBlock + STsdbReader* dataReader; + SSnapContext* sContext; +} SStreamRawScanInfo; typedef struct SSysTableScanInfo { SRetrieveMetaTableRsp* pRsp; @@ -528,14 +529,14 @@ typedef struct SBlockDistInfo { SSDataBlock* pResBlock; void* pHandle; SReadHandle readHandle; - uint64_t uid; // table uid + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this typedef struct SOptrBasicInfo { - SResultRowInfo resultRowInfo; - SSDataBlock* pRes; - bool mergeResultBlock; + SResultRowInfo resultRowInfo; + SSDataBlock* pRes; + bool mergeResultBlock; } SOptrBasicInfo; typedef struct SIntervalAggOperatorInfo { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 80c1494f8d..70180d6dc0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1190,7 +1190,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); } else { char* udfName = pExpr->pExpr->_function.pFunctNode->functionName; - strncpy(pCtx->udfName, udfName, strlen(udfName)); + strncpy(pCtx->udfName, udfName, TSDB_FUNC_NAME_LEN); fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet); } pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 124f4b44b0..f1ac9ef8b1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -139,7 +139,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { if (msg == NULL) { - // TODO create raw scan + // create raw scan SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); if (NULL == pTaskInfo) { @@ -151,7 +151,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo); - if(NULL == pTaskInfo->pRoot){ + if (NULL == pTaskInfo->pRoot) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pTaskInfo); return NULL; @@ -834,11 +834,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } else { ASSERT(0); } - }else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){ + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if(setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid); + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); return -1; } @@ -847,27 +847,29 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pInfo->dataReader = NULL; cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList); - if(mtInfo.uid == 0) return 0; // no data + if (mtInfo.uid == 0) return 0; // no data initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo); pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, + &pInfo->dataReader, NULL); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; + qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts); - }else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){ + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if(setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid); + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setForSnapShot error. uid:%" PRIi64 " ,version:%" PRIi64, pOffset->uid); return -1; } qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid); - }else if (pOffset->type == TMQ_OFFSET__LOG) { + } else if (pOffset->type == TMQ_OFFSET__LOG) { SStreamRawScanInfo* pInfo = pOperator->info; tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4ffa80d468..b53d35a1a1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -268,7 +268,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, - sizeof(SResultRowPosition)); + sizeof(SResultRowPosition)); } // 2. set the new time window to be the new active time window @@ -2815,92 +2815,6 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } } -#if 0 -int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { - uint8_t type = pOperator->operatorType; - - pOperator->status = OP_OPENED; - - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pScanInfo = pOperator->info; - pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN; - - pScanInfo->pTableScanOp->status = OP_OPENED; - - STableScanInfo* pInfo = pScanInfo->pTableScanOp->info; - ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER); - - if (uid == 0) { - pInfo->noTable = 1; - return TSDB_CODE_SUCCESS; - } - - /*if (pSnapShotScanInfo->dataReader == NULL) {*/ - /*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/ - /*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/ - /*}*/ - - pInfo->noTable = 0; - - if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); - bool found = false; - for (int32_t i = 0; i < tableSz; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); - if (pTableInfo->uid == uid) { - found = true; - pInfo->currentTable = i; - } - } - // TODO after processing drop, found can be false - ASSERT(found); - - tsdbSetTableId(pInfo->dataReader, uid); - int64_t oldSkey = pInfo->cond.twindows.skey; - pInfo->cond.twindows.skey = ts + 1; - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); - pInfo->cond.twindows.skey = oldSkey; - pInfo->scanTimes = 0; - - qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts, - pInfo->currentTable, tableSz); - } - - return TSDB_CODE_SUCCESS; - - } else { - if (pOperator->numOfDownstream == 1) { - return doPrepareScan(pOperator->pDownstream[0], uid, ts); - } else if (pOperator->numOfDownstream == 0) { - qError("failed to find stream scan operator to set the input data block"); - return TSDB_CODE_QRY_APP_ERROR; - } else { - qError("join not supported for stream block scan"); - return TSDB_CODE_QRY_APP_ERROR; - } - } -} - -int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { - int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pScanInfo = pOperator->info; - STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info; - *uid = pSnapShotScanInfo->lastStatus.uid; - *ts = pSnapShotScanInfo->lastStatus.ts; - } else { - if (pOperator->pDownstream[0] == NULL) { - return TSDB_CODE_INVALID_PARA; - } else { - doGetScanStatus(pOperator->pDownstream[0], uid, ts); - } - } - - return TSDB_CODE_SUCCESS; -} -#endif // this is a blocking operator static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { @@ -3024,7 +2938,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset); setBufPageDirty(pPage, true); releaseBufPage(pSup->pResultBuf, pPage); - + int32_t iter = 0; void* pIter = NULL; while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) { @@ -3434,7 +3348,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { - int32_t code = 0; + int32_t code = 0; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->currentPageId = -1; @@ -4068,8 +3982,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid); } - } #endif + } pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); @@ -4294,42 +4208,6 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { return pList; } -#if 0 -STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - STableListInfo* pTableListInfo, const char* idstr) { - int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { - code = 0; - qDebug("no table qualified for query, %s", idstr); - goto _error; - } - - SQueryTableDataCond cond = {0}; - code = initQueryTableDataCond(&cond, pTableScanNode); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - STsdbReader* pReader; - code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - cleanupQueryTableDataCond(&cond); - - return pReader; - -_error: - terrno = code; - return NULL; -} -#endif - static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -4571,7 +4449,7 @@ _complete: return code; } -static void doDestroyTableList(STableListInfo* pTableqinfoList) { +void doDestroyTableList(STableListInfo* pTableqinfoList) { taosArrayDestroy(pTableqinfoList->pTableList); taosHashCleanup(pTableqinfoList->map); if (pTableqinfoList->needSortTableByGroupId) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d4c98adb7c..de6768b83a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1219,7 +1219,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32 static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; - SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); @@ -1228,7 +1228,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupIdPre) { pInfo->pRes->info.groupId = *groupIdPre; } else { @@ -1276,11 +1276,80 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock return 0; } +static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamScanInfo* pInfo = pOperator->info; + + qDebug("queue scan called"); + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { + qDebug("queue scan tsdb return %d rows", pResult->info.rows); + return pResult; + } else { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + tsdbReaderClose(pTSInfo->dataReader); + pTSInfo->dataReader = NULL; + tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); + qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1); + if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + return NULL; + } + ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); + } + } + + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { + while (1) { + SFetchRet ret = {0}; + tqNextBlock(pInfo->tqReader, &ret); + if (ret.fetchType == FETCH_TYPE__DATA) { + blockDataCleanup(pInfo->pRes); + if (setBlockIntoRes(pInfo, &ret.data) < 0) { + ASSERT(0); + } + // TODO clean data block + if (pInfo->pRes->info.rows > 0) { + qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); + return pInfo->pRes; + } + } else if (ret.fetchType == FETCH_TYPE__META) { + ASSERT(0); + // pTaskInfo->streamInfo.lastStatus = ret.offset; + // pTaskInfo->streamInfo.metaBlk = ret.meta; + // return NULL; + } else if (ret.fetchType == FETCH_TYPE__NONE) { + pTaskInfo->streamInfo.lastStatus = ret.offset; + ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); + ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); + char formatBuf[80]; + tFormatOffset(formatBuf, 80, &ret.offset); + qDebug("queue scan log return null, offset %s", formatBuf); + return NULL; + } else { + ASSERT(0); + } + } + } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { + qDebug("stream scan tsdb return %d rows", pResult->info.rows); + return pResult; + } + qDebug("stream scan tsdb return null"); + return NULL; + } else { + ASSERT(0); + return NULL; + } +} + static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; + qDebug("stream scan called"); #if 0 SStreamState* pState = pTaskInfo->streamInfo.pState; if (pState) { @@ -1317,48 +1386,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } #endif - qDebug("stream scan called"); - if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { - while (1) { - SFetchRet ret = {0}; - tqNextBlock(pInfo->tqReader, &ret); - if (ret.fetchType == FETCH_TYPE__DATA) { - blockDataCleanup(pInfo->pRes); - if (setBlockIntoRes(pInfo, &ret.data) < 0) { - ASSERT(0); - } - // TODO clean data block - if (pInfo->pRes->info.rows > 0) { - qDebug("stream scan log return %d rows", pInfo->pRes->info.rows); - return pInfo->pRes; - } - } else if (ret.fetchType == FETCH_TYPE__META) { - ASSERT(0); -// pTaskInfo->streamInfo.lastStatus = ret.offset; -// pTaskInfo->streamInfo.metaBlk = ret.meta; -// return NULL; - } else if (ret.fetchType == FETCH_TYPE__NONE) { - pTaskInfo->streamInfo.lastStatus = ret.offset; - ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version); - ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); - char formatBuf[80]; - tFormatOffset(formatBuf, 80, &ret.offset); - qDebug("stream scan log return null, offset %s", formatBuf); - return NULL; - } else { - ASSERT(0); - } - } - } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); - if (pResult && pResult->info.rows > 0) { - qDebug("stream scan tsdb return %d rows", pResult->info.rows); - return pResult; - } - qDebug("stream scan tsdb return null"); - return NULL; - } - if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); @@ -1554,14 +1581,14 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { } static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { -// NOTE: this operator does never check if current status is done or not - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + // NOTE: this operator does never check if current status is done or not + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamRawScanInfo* pInfo = pOperator->info; - pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta + pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta pTaskInfo->streamInfo.metaRsp.metaRsp = NULL; qDebug("tmqsnap doRawScan called"); - if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){ + if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pBlock = &pInfo->pRes; if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { @@ -1585,42 +1612,38 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { } SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext); - if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal + if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal qDebug("tmqsnap read snapshot done, change to get data from wal"); pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion; - tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); - }else{ + } else { pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid; pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN; qDebug("tmqsnap change get data uid:%ld", mtInfo.uid); qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType); - strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); - tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); - pTaskInfo->streamInfo.schema = mtInfo.schema; } qDebug("tmqsnap stream scan tsdb return null"); return NULL; - }else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){ - SSnapContext *sContext = pInfo->sContext; - void* data = NULL; - int32_t dataLen = 0; - int16_t type = 0; - int64_t uid = 0; - if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){ + } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) { + SSnapContext* sContext = pInfo->sContext; + void* data = NULL; + int32_t dataLen = 0; + int16_t type = 0; + int64_t uid = 0; + if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) { qError("tmqsnap getMetafromSnapShot error"); taosMemoryFreeClear(data); return NULL; } - if(!sContext->queryMetaOrData){ // change to get data next poll request + if (!sContext->queryMetaOrData) { // change to get data next poll request pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; pTaskInfo->streamInfo.lastStatus.uid = uid; pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA; pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0; pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN; - }else{ + } else { pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META; pTaskInfo->streamInfo.lastStatus.uid = uid; pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus; @@ -1631,44 +1654,44 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { return NULL; } -// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { -// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; -// -// while(1){ -// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { -// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); -// pTaskInfo->streamInfo.lastStatus.version = fetchVer; -// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; -// return NULL; -// } -// SWalCont* pHead = &pInfo->pCkHead->head; -// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); -// -// if (pHead->msgType == TDMT_VND_SUBMIT) { -// SSubmitReq* pCont = (SSubmitReq*)&pHead->body; -// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); -// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes); -// if(block){ -// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; -// pTaskInfo->streamInfo.lastStatus.version = fetchVer; -// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); -// return block; -// }else{ -// fetchVer++; -// } -// } else{ -// ASSERT(pInfo->sContext->withMeta); -// ASSERT(IS_META_MSG(pHead->msgType)); -// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); -// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; -// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; -// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; -// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; -// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); -// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); -// return NULL; -// } -// } + // else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { + // int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1; + // + // while(1){ + // if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) { + // qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer); + // pTaskInfo->streamInfo.lastStatus.version = fetchVer; + // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; + // return NULL; + // } + // SWalCont* pHead = &pInfo->pCkHead->head; + // qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType); + // + // if (pHead->msgType == TDMT_VND_SUBMIT) { + // SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + // tqReaderSetDataMsg(pInfo->tqReader, pCont, 0); + // SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, + // &pInfo->pRes); if(block){ + // pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG; + // pTaskInfo->streamInfo.lastStatus.version = fetchVer; + // qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + // return block; + // }else{ + // fetchVer++; + // } + // } else{ + // ASSERT(pInfo->sContext->withMeta); + // ASSERT(IS_META_MSG(pHead->msgType)); + // qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + // pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer; + // pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG; + // pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType; + // pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen; + // pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen); + // memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen); + // return NULL; + // } + // } return NULL; } @@ -1689,7 +1712,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT // create tq reader SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; @@ -1699,13 +1722,12 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pInfo->sContext = pHandle->sContext; pOperator->name = "RawStreamScanOperator"; -// pOperator->blocking = false; -// pOperator->status = OP_NOT_OPENED; + // pOperator->blocking = false; + // pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, - NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL); return pOperator; } @@ -1724,7 +1746,7 @@ static void destroyStreamScanOperatorInfo(void* param) { } if (pStreamScan->pPseudoExpr) { destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr); - taosMemoryFreeClear(pStreamScan->pPseudoExpr); + taosMemoryFree(pStreamScan->pPseudoExpr); } updateInfoDestroy(pStreamScan->pUpdateInfo); @@ -1815,6 +1837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; + pTaskInfo->streamInfo.snapshotVer = pHandle->version; // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); @@ -1858,8 +1881,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo, - NULL, NULL, NULL); + __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL); return pOperator; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b71d06231e..84d3a04807 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1160,6 +1160,7 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c } ASSERT(0); + return 0; } int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index d5a3e91eea..5de9c52cc1 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -81,7 +81,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { taosDirName(path); #endif } else { - strncpy(path, tsProcPath, strlen(tsProcPath)); + strncpy(path, tsProcPath, PATH_MAX); taosDirName(path); } #ifdef WINDOWS diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 5b27e030b9..a45e4585e8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -453,7 +453,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { goto _return; } taosCloseFile(&file); - strncpy(udf->path, path, strlen(path)); + strncpy(udf->path, path, PATH_MAX); tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); msgInfo->code = 0; @@ -566,17 +566,17 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc)); char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *startSuffix = "_start"; - strncpy(startFuncName, processFuncName, strlen(processFuncName)); + strncpy(startFuncName, processFuncName, sizeof(startFuncName)); strncat(startFuncName, startSuffix, strlen(startSuffix)); uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc)); char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *finishSuffix = "_finish"; - strncpy(finishFuncName, processFuncName, strlen(processFuncName)); + strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc)); char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *mergeSuffix = "_merge"; - strncpy(finishFuncName, processFuncName, strlen(processFuncName)); + strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix)); uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc)); } diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index 4f33d98f9e..cd52d122f7 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -171,6 +171,7 @@ TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t d return tDoCompare(func, cmptype, &va, &vb); } assert(0); + return BREAK; #endif } TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index a857d575f1..a88b7b417f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6408,8 +6408,8 @@ typedef struct SVgroupDropTableBatch { char dbName[TSDB_DB_NAME_LEN]; } SVgroupDropTableBatch; -static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo) { - SVDropTbReq req = {.name = pClause->tableName, .igNotExists = pClause->ignoreNotExists}; +static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo, uint64_t suid) { + SVDropTbReq req = {.name = pClause->tableName, .suid = suid, .igNotExists = pClause->ignoreNotExists}; SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); if (NULL == pTableBatch) { SVgroupDropTableBatch tBatch = {0}; @@ -6450,7 +6450,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info); } if (TSDB_CODE_SUCCESS == code) { - addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info); + addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info, pTableMeta->suid); } over: diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 07c4fa8429..3bda9bcd51 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -163,6 +163,7 @@ int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pR } } ASSERT(0); + return -1; } void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) { @@ -190,6 +191,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa } } ASSERT(0); + return -1; } // for debug ------------------- @@ -245,4 +247,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI } } ASSERT(0); + return -1; } \ No newline at end of file diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 0a0dcef378..9e0a8f2a10 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -43,7 +43,7 @@ void* rpcOpen(const SRpcInit* pInit) { return NULL; } if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1); + tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN); } // register callback handle pRpc->cfp = pInit->cfp; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index a5b5a2b7b4..9db7d6c455 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -372,7 +372,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int64_t code; - ASSERT(pRead->curVersion == pHead->head.version); +// ASSERT(pRead->curVersion == pHead->head.version); code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR); if (code < 0) { diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index 5353cd9bfe..699f0db7a1 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -201,6 +201,7 @@ void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size return (c > 0) ? p : (midx > 0 ? p - size : NULL); } else { ASSERT(0); + return NULL; } } diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 7032f39744..cbda4e4655 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -244,6 +244,7 @@ int32_t compareJsonVal(const void *pLeft, const void *pRight) { return 0; }else{ assert(0); + return 0; } } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 06ebbf27fb..46203658f1 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -446,7 +446,10 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b } if (dflag & DEBUG_SCREEN) { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-result" write(1, buffer, (uint32_t)len); +#pragma GCC diagnostic pop } } diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index aef5056031..b90b781e44 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -20,6 +20,9 @@ #include "tconfig.h" #include "tjson.h" +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-result" + #define TMP_DNODE_DIR TD_TMP_DIR_PATH "dumpsdb" #define TMP_MNODE_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" #define TMP_SDB_DATA_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "data" @@ -429,6 +432,7 @@ int32_t parseArgs(int32_t argc, char *argv[]) { char cmd[PATH_MAX * 2] = {0}; snprintf(cmd, sizeof(cmd), "rm -rf %s", TMP_DNODE_DIR); + system(cmd); #ifdef WINDOWS taosMulMkDir(TMP_SDB_DATA_DIR); @@ -467,3 +471,5 @@ int32_t main(int32_t argc, char *argv[]) { return dumpSdb(); } + +#pragma GCC diagnostic pop \ No newline at end of file diff --git a/tests/test/c/tmq_taosx_ci.c b/tests/test/c/tmq_taosx_ci.c index 2afa05b012..f917b9159e 100644 --- a/tests/test/c/tmq_taosx_ci.c +++ b/tests/test/c/tmq_taosx_ci.c @@ -26,6 +26,7 @@ TdFilePtr g_fp = NULL; typedef struct{ bool snapShot; bool dropTable; + bool subTable; int srcVgroups; int dstVgroups; char dir[64]; @@ -74,57 +75,7 @@ static void msg_process(TAOS_RES* msg) { taos_close(pConn); } -int32_t init_env(Config *conf) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return -1; - } - - TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx"); - if (taos_errno(pRes) != 0) { - printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - char sql[128] = {0}; - snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", conf->dstVgroups); - pRes = taos_query(pConn, sql); - if (taos_errno(pRes) != 0) { - printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop topic if exists topic_db"); - if (taos_errno(pRes) != 0) { - printf("error in drop topic, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "drop database if exists abc1"); - if (taos_errno(pRes) != 0) { - printf("error in drop db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - snprintf(sql, 128, "create database if not exists abc1 vgroups %d", conf->srcVgroups); - pRes = taos_query(pConn, sql); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - +int buildDatabase(TAOS* pConn, TAOS_RES* pRes){ pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " "nchar(8), t4 bool)"); @@ -232,7 +183,7 @@ int32_t init_env(Config *conf) { } taos_free_result(pRes); - if(conf->dropTable){ + if(g_conf.dropTable){ pRes = taos_query(pConn, "drop table ct3 ct1"); if (taos_errno(pRes) != 0) { printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); @@ -297,7 +248,7 @@ int32_t init_env(Config *conf) { } taos_free_result(pRes); - if(conf->dropTable){ + if(g_conf.dropTable){ pRes = taos_query(pConn, "drop table n1"); if (taos_errno(pRes) != 0) { printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); @@ -341,7 +292,7 @@ int32_t init_env(Config *conf) { } taos_free_result(pRes); - if(conf->dropTable){ + if(g_conf.dropTable){ pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " "nchar(8), t4 bool)"); @@ -358,6 +309,112 @@ int32_t init_env(Config *conf) { } taos_free_result(pRes); } + return 0; +} + +int buildStable(TAOS* pConn, TAOS_RES* pRes){ + pRes = taos_query(pConn, "CREATE STABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` VARCHAR(16))"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1 using meters tags(2, 'Beijing')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stream meters_summary_s into meters_summary as select _wstart, max(current) as current, groupid, location from meters partition by groupid, location interval(10m)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table meters_summary, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + return 0; +} + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + char sql[128] = {0}; + snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", g_conf.dstVgroups); + pRes = taos_query(pConn, sql); + if (taos_errno(pRes) != 0) { + printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop topic if exists topic_db"); + if (taos_errno(pRes) != 0) { + printf("error in drop topic, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop topic if exists meters_summary_t1"); + if (taos_errno(pRes) != 0) { + printf("error in drop topic, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists abc1"); + if (taos_errno(pRes) != 0) { + printf("error in drop db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + snprintf(sql, 128, "create database if not exists abc1 vgroups %d", g_conf.srcVgroups); + pRes = taos_query(pConn, sql); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + if(g_conf.subTable){ + buildStable(pConn, pRes); + }else{ + buildDatabase(pConn, pRes); + } + taos_close(pConn); return 0; } @@ -377,12 +434,21 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_db with meta as database abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes)); - return -1; + if(g_conf.subTable){ + pRes = taos_query(pConn, "create topic meters_summary_t1 with meta as stable meters_summary"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic meters_summary_t1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + }else{ + pRes = taos_query(pConn, "create topic topic_db with meta as database abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); } - taos_free_result(pRes); taos_close(pConn); return 0; @@ -392,7 +458,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { printf("commit %d tmq %p param %p\n", code, tmq, param); } -tmq_t* build_consumer(Config *config) { +tmq_t* build_consumer() { tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "group.id", "tg2"); tmq_conf_set(conf, "client.id", "my app 1"); @@ -402,7 +468,7 @@ tmq_t* build_consumer(Config *config) { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "enable.heartbeat.background", "true"); - if(config->snapShot){ + if(g_conf.snapShot){ tmq_conf_set(conf, "experimental.snapshot.enable", "true"); } @@ -415,7 +481,11 @@ tmq_t* build_consumer(Config *config) { tmq_list_t* build_topic_list() { tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "topic_db"); + if(g_conf.subTable){ + tmq_list_append(topic_list, "meters_summary_t1"); + }else{ + tmq_list_append(topic_list, "topic_db"); + } return topic_list; } @@ -446,16 +516,16 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { fprintf(stderr, "%% Consumer closed\n"); } -void initLogFile(Config *conf) { +void initLogFile() { char f1[256] = {0}; char f2[256] = {0}; - if(conf->snapShot){ - sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", conf->dir); - sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", conf->dir); + if(g_conf.snapShot){ + sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", g_conf.dir); + sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", g_conf.dir); }else{ - sprintf(f1, "%s/../log/tmq_taosx_tmp.source", conf->dir); - sprintf(f2, "%s/../log/tmq_taosx_tmp.result", conf->dir); + sprintf(f1, "%s/../log/tmq_taosx_tmp.source", g_conf.dir); + sprintf(f2, "%s/../log/tmq_taosx_tmp.result", g_conf.dir); } TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM); @@ -471,7 +541,7 @@ void initLogFile(Config *conf) { exit(-1); } - if(conf->snapShot){ + if(g_conf.snapShot){ char *result[] = { "{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}", "{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}", @@ -531,20 +601,22 @@ int main(int argc, char* argv[]) { g_conf.srcVgroups = atol(argv[++i]); }else if(strcmp(argv[i], "-dv") == 0){ g_conf.dstVgroups = atol(argv[++i]); + }else if(strcmp(argv[i], "-t") == 0){ + g_conf.subTable = true; } } printf("env init\n"); if(strlen(g_conf.dir) != 0){ - initLogFile(&g_conf); + initLogFile(); } - if (init_env(&g_conf) < 0) { + if (init_env() < 0) { return -1; } create_topic(); - tmq_t* tmq = build_consumer(&g_conf); + tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); basic_consume_loop(tmq, topic_list); taosCloseFile(&g_fp); diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index b993a8dbf1..16732ff9a1 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -464,7 +464,10 @@ void simStoreSystemContentResult(SScript *script, char *filename) { taosCloseFile(&pFile); char rmCmd[MAX_FILE_NAME_LEN] = {0}; sprintf(rmCmd, "rm -f %s", filename); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-result" system(rmCmd); +#pragma GCC diagnostic pop } } diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index d87e10fd08..b73317e991 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -510,7 +510,10 @@ int32_t shellReadCommand(char *command) { shellClearLineAfter(&cmd); break; case 12: // Ctrl + L; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-result" system("clear"); +#pragma GCC diagnostic pop shellShowOnScreen(&cmd); break; case 21: // Ctrl + U; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index e54b98a0a6..45d5489803 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -62,7 +62,10 @@ int32_t shellRunSingleCommand(char *command) { } if (shellRegexMatch(command, "^[\t ]*clear[ \t;]*$", REG_EXTENDED | REG_ICASE)) { - system("clear"); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-result" + system("clear"); +#pragma GCC diagnostic pop return 0; }