This commit is contained in:
yihaoDeng 2022-09-01 10:26:44 +08:00
commit 9572a30d24
44 changed files with 984 additions and 650 deletions

View File

@ -88,4 +88,3 @@ Standard: Auto
TabWidth: 8
UseTab: Never
...

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
build/
compile_commands.json
CMakeSettings.json
.cache
.ycm_extra_conf.py
.tasks

View File

@ -1,25 +0,0 @@
{
"configurations": [
{
"name": "WSL-GCC-Debug",
"generator": "Unix Makefiles",
"configurationType": "Debug",
"buildRoot": "${projectDir}\\build\\",
"installRoot": "${projectDir}\\build\\",
"cmakeExecutable": "/usr/bin/cmake",
"cmakeCommandArgs": "",
"buildCommandArgs": "",
"ctestCommandArgs": "",
"inheritEnvironments": [ "linux_x64" ],
"wslPath": "${defaultWSLPath}",
"addressSanitizerRuntimeFlags": "detect_leaks=0",
"variables": [
{
"name": "CMAKE_INSTALL_PREFIX",
"value": "/mnt/d/TDengine/TDengine/build",
"type": "PATH"
}
]
}
]
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 19 KiB

View File

@ -7,7 +7,7 @@ description: "taospy is the official Python connector for TDengine. taospy provi
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
`taospy is the official Python connector for TDengine. taospy provides a rich API that makes it easy for Python applications to use TDengine. `taospy` wraps both the [native interface](/reference/connector/cpp) and [REST interface](/reference/rest-api) of TDengine, which correspond to the `taos` and `taosrest` modules of the `taospy` package, respectively.
`taospy` is the official Python connector for TDengine. taospy provides a rich API that makes it easy for Python applications to use TDengine. `taospy` wraps both the [native interface](/reference/connector/cpp) and [REST interface](/reference/rest-api) of TDengine, which correspond to the `taos` and `taosrest` modules of the `taospy` package, respectively.
In addition to wrapping the native and REST interfaces, `taospy` also provides a set of programming interfaces that conforms to the [Python Data Access Specification (PEP 249)](https://peps.python.org/pep-0249/). It is easy to integrate `taospy` with many third-party tools, such as [SQLAlchemy](https://www.sqlalchemy.org/) and [pandas](https://pandas.pydata.org/).
The direct connection to the server using the native interface provided by the client driver is referred to hereinafter as a "native connection"; the connection to the server using the REST interface provided by taosAdapter is referred to hereinafter as a "REST connection".

View File

@ -1,10 +0,0 @@
- 已安装客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装)
:::info
由于 TDengine 的客户端驱动使用 C 语言编写,使用原生连接时需要加载系统对应安装在本地的客户端驱动共享库文件,通常包含在 TDengine 安装包。TDengine Linux 服务端安装包附带了 TDengine 客户端,也可以单独安装 [Linux 客户端](/get-started/) 。在 Windows 环境开发时需要安装 TDengine 对应的 [Windows 客户端](https://www.taosdata.com/cn/all-downloads/#TDengine-Windows-Client) 。
- libtaos.so: 在 Linux 系统中成功安装 TDengine 后,依赖的 Linux 版客户端驱动 libtaos.so 文件会被自动拷贝至 /usr/lib/libtaos.so该目录包含在 Linux 自动扫描路径上,无需单独指定。
- taos.dll: 在 Windows 系统中安装完客户端之后,依赖的 Windows 版客户端驱动 taos.dll 文件会自动拷贝到系统默认搜索路径 C:/Windows/System32 下,同样无需要单独指定。
:::

View File

@ -26,7 +26,7 @@ TDengine 的主要功能如下:
- [Icinga2](../third-party/icinga2)
- [TCollector](../third-party/tcollector)
- [EMQX](../third-party/emq-broker)
- [HiveMQ](../third-party/hive-mq-broker)
- [HiveMQ](../third-party/hive-mq-broker)
2. 查询数据,支持
- [标准 SQL](../taos-sql),含嵌套查询
- [时序数据特色函数](../taos-sql/function/#time-series-extensions)
@ -85,14 +85,14 @@ TDengine 的主要功能如下:
![TDengine Database 技术生态图](eco_system.webp)
<center><figcaption>图 1. TDengine技术生态图</figcaption></center>
<center><figcaption>图 1. TDengine 技术生态图</figcaption></center>
</figure>
上图中,左侧是各种数据采集或消息队列,包括 OPC-UA、MQTT、Telegraf、也包括 Kafka他们的数据将被源源不断的写入到 TDengine。右侧则是可视化、BI 工具、组态软件、应用程序。下侧则是 TDengine 自身提供的命令行程序CLI以及可视化管理工具。
## 典型适用场景
作为一个高性能、分布式、支持 SQL 的时序数据库Database)TDengine 的典型适用场景包括但不限于 IoT、工业互联网、车联网、IT 运维、能源、金融证券等领域。需要指出的是TDengine 是针对时序数据场景设计的专用数据库和专用大数据处理工具因其充分利用了时序大数据的特点它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM 等通用型数据。下面本文将对适用场景做更多详细的分析。
作为一个高性能、分布式、支持 SQL 的时序数据库DatabaseTDengine 的典型适用场景包括但不限于 IoT、工业互联网、车联网、IT 运维、能源、金融证券等领域。需要指出的是TDengine 是针对时序数据场景设计的专用数据库和专用大数据处理工具因其充分利用了时序大数据的特点它无法用来处理网络爬虫、微博、微信、电商、ERP、CRM 等通用型数据。下面本文将对适用场景做更多详细的分析。
### 数据源特点和需求

View File

@ -2070,6 +2070,7 @@ int32_t tDeserializeSVCreateTbBatchRsp(void* buf, int32_t bufLen, SVCreateTbBatc
// TDMT_VND_DROP_TABLE =================
typedef struct {
char* name;
uint64_t suid; // for tmq in wal format
int8_t igNotExists;
} SVDropTbReq;
@ -2626,6 +2627,22 @@ typedef struct {
};
} STqOffsetVal;
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
pOffsetVal->uid = uid;
pOffsetVal->ts = ts;
}
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
pOffsetVal->uid = uid;
}
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
pOffsetVal->type = TMQ_OFFSET__LOG;
pOffsetVal->version = ver;
}
int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
@ -2957,6 +2974,7 @@ typedef struct {
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteSMqDataRsp(SMqDataRsp* pRsp);
typedef struct {
SMqRspHead head;

View File

@ -64,6 +64,11 @@ pipeline {
defaultValue:'2.1.2',
description: 'This number of baseVerison is generally not modified.Now it is 3.0.0.1'
)
string (
name:'nasPassword',
defaultValue:'password',
description: 'the pasword of the NAS server which has installPackage-192.168.1.131'
)
}
environment{
WORK_DIR = '/var/lib/jenkins/workspace'
@ -111,17 +116,17 @@ pipeline {
sync_source("${BRANCH_NAME}")
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
}
@ -134,17 +139,22 @@ pipeline {
sync_source("${BRANCH_NAME}")
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_DEB} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_DEB} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_CLIENT_TAR} ${version} ${BASE_TD_CLIENT_TAR} ${baseVersion} client ${nasPassword}
python3 checkPackageRuning.py
'''
}
@ -157,17 +167,17 @@ pipeline {
sync_source("${BRANCH_NAME}")
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
}
@ -179,18 +189,42 @@ pipeline {
timeout(time: 30, unit: 'MINUTES'){
sync_source("${BRANCH_NAME}")
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_TAR} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_LITE_TAR} ${version} ${BASE_TD_SERVER_LITE_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server
bash testpackage.sh ${TD_SERVER_RPM} ${version} ${BASE_TD_SERVER_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_CLIENT_LITE_TAR} ${version} ${BASE_TD_CLIENT_LITE_TAR} ${baseVersion} client ${nasPassword}
python3 checkPackageRuning.py
'''
}
}
}
stage('arm64') {
agent{label 'linux_arm64'}
steps {
timeout(time: 30, unit: 'MINUTES'){
sync_source("${BRANCH_NAME}")
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_SERVER_ARM_TAR} ${version} ${BASE_TD_SERVER_ARM_TAR} ${baseVersion} server ${nasPassword}
python3 checkPackageRuning.py
'''
sh '''
cd ${TDENGINE_ROOT_DIR}/packaging
bash testpackage.sh ${TD_CLIENT_ARM_TAR} ${version} ${BASE_TD_CLIENT_ARM_TAR} ${baseVersion} client ${nasPassword}
python3 checkPackageRuning.py
'''
}

13
packaging/debRpmAutoInstall.sh Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/expect
set packgeName [lindex $argv 0]
set packageSuffix [lindex $argv 1]
set timeout 3
if { ${packageSuffix} == "deb" } {
spawn dpkg -i ${packgeName}
} elseif { ${packageSuffix} == "rpm"} {
spawn rpm -ivh ${packgeName}
}
expect "*one:"
send "\r"
expect "*skip:"
send "\r"

View File

@ -1,6 +1,6 @@
#!/bin/sh
#parameter
scriptDir=$(dirname $(readlink -f $0))
packgeName=$1
version=$2
originPackageName=$3
@ -9,6 +9,17 @@ testFile=$5
subFile="taos.tar.gz"
password=$6
# Color setting
RED='\033[41;30m'
GREEN='\033[1;32m'
YELLOW='\033[1;33m'
BLUE='\033[1;34m'
GREEN_DARK='\033[0;32m'
YELLOW_DARK='\033[0;33m'
BLUE_DARK='\033[0;34m'
GREEN_UNDERLINE='\033[4;32m'
NC='\033[0m'
if [ ${testFile} = "server" ];then
tdPath="TDengine-server-${version}"
originTdpPath="TDengine-server-${originversion}"
@ -24,146 +35,211 @@ elif [ ${testFile} = "tools" ];then
fi
function cmdInstall {
comd=$1
if command -v ${comd} ;then
echo "${comd} is already installed"
command=$1
if command -v ${command} ;then
echoColor YD "${command} is already installed"
else
if command -v apt ;then
apt-get install ${comd} -y
apt-get install ${command} -y
elif command -v yum ;then
yum -y install ${comd}
echo "you should install ${comd} manually"
yum -y install ${command}
echoColor YD "you should install ${command} manually"
fi
fi
}
function echoColor {
color=$1
command=$2
echo "Uninstall all components of TDeingne"
if command -v rmtaos ;then
echo "uninstall all components of TDeingne:rmtaos"
rmtaos
else
echo "os doesn't include TDengine "
if [ ${color} = 'Y' ];then
echo -e "${YELLOW}${command}${NC}"
elif [ ${color} = 'YD' ];then
echo -e "${YELLOW_DARK}${command}${NC}"
elif [ ${color} = 'R' ];then
echo -e "${RED}${command}${NC}"
elif [ ${color} = 'G' ];then
echo -e "${GREEN}${command}${NC}\r\n"
elif [ ${color} = 'B' ];then
echo -e "${BLUE}${command}${NC}"
elif [ ${color} = 'BD' ];then
echo -e "${BLUE_DARK}${command}${NC}"
fi
}
if command -v rmtaostools ;then
echo "uninstall all components of TDeingne:rmtaostools"
rmtaostools
else
echo "os doesn't include rmtaostools "
fi
echoColor G "===== install basesoft ====="
cmdInstall tree
cmdInstall wget
cmdInstall sshpass
echo "new workroom path"
echoColor G "===== Uninstall all components of TDeingne ====="
if command -v rmtaos ;then
echoColor YD "uninstall all components of TDeingne:rmtaos"
rmtaos
else
echoColor YD "os doesn't include TDengine"
fi
if command -v rmtaostools ;then
echoColor YD "uninstall all components of TDeingne:rmtaostools"
rmtaostools
else
echoColor YD "os doesn't include rmtaostools "
fi
echoColor G "===== new workroom path ====="
installPath="/usr/local/src/packageTest"
oriInstallPath="/usr/local/src/packageTest/3.1"
if [ ! -d ${installPath} ] ;then
echoColor BD "mkdir -p ${installPath}"
mkdir -p ${installPath}
else
echo "${installPath} already exists"
echoColor YD "${installPath} already exists"
fi
if [ -d ${installPath}/${tdPath} ] ;then
echoColor BD "rm -rf ${installPath}/${tdPath}/*"
rm -rf ${installPath}/${tdPath}/*
fi
if [ ! -d ${oriInstallPath} ] ;then
echoColor BD "mkdir -p ${oriInstallPath}"
mkdir -p ${oriInstallPath}
else
echo "${oriInstallPath} already exists"
echoColor YD "${oriInstallPath} already exists"
fi
if [ -d ${oriInstallPath}/${originTdpPath} ] ;then
echoColor BD "rm -rf ${oriInstallPath}/${originTdpPath}/*"
rm -rf ${oriInstallPath}/${originTdpPath}/*
fi
echo "download installPackage"
echoColor G "===== download installPackage ====="
# cd ${installPath}
# wget https://www.taosdata.com/assets-download/3.0/${packgeName}
# cd ${oriInstallPath}
# wget https://www.taosdata.com/assets-download/3.0/${originPackageName}
cd ${installPath}
cp -r ${scriptDir}/debRpmAutoInstall.sh .
if [ ! -f {packgeName} ];then
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} .
echoColor BD "sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} ."
sshpass -p ${password} scp -oStrictHostKeyChecking=no -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/${packgeName} .
fi
if [ ! -f debAuto.sh ];then
echo '#!/usr/bin/expect ' > debAuto.sh
echo 'set timeout 3 ' >> debAuto.sh
echo 'pset packgeName [lindex $argv 0]' >> debAuto.sh
echo 'spawn dpkg -i ${packgeName}' >> debAuto.sh
echo 'expect "*one:"' >> debAuto.sh
echo 'send "\r"' >> debAuto.sh
echo 'expect "*skip:"' >> debAuto.sh
echo 'send "\r" ' >> debAuto.sh
packageSuffix=$(echo ${packgeName} | awk -F '.' '{print $NF}')
if [ ! -f debRpmAutoInstall.sh ];then
echo '#!/usr/bin/expect ' > debRpmAutoInstall.sh
echo 'set packgeName [lindex $argv 0]' >> debRpmAutoInstall.sh
echo 'set packageSuffix [lindex $argv 1]' >> debRpmAutoInstall.sh
echo 'set timeout 3 ' >> debRpmAutoInstall.sh
echo 'if { ${packageSuffix} == "deb" } {' >> debRpmAutoInstall.sh
echo ' spawn dpkg -i ${packgeName} ' >> debRpmAutoInstall.sh
echo '} elseif { ${packageSuffix} == "rpm"} {' >> debRpmAutoInstall.sh
echo ' spawn rpm -ivh ${packgeName}' >> debRpmAutoInstall.sh
echo '}' >> debRpmAutoInstall.sh
echo 'expect "*one:"' >> debRpmAutoInstall.sh
echo 'send "\r"' >> debRpmAutoInstall.sh
echo 'expect "*skip:"' >> debRpmAutoInstall.sh
echo 'send "\r" ' >> debRpmAutoInstall.sh
fi
echoColor G "===== instal Package ====="
if [[ ${packgeName} =~ "deb" ]];then
cd ${installPath}
dpkg -r taostools
dpkg -r tdengine
if [[ ${packgeName} =~ "TDengine" ]];then
echo "./debAuto.sh ${packgeName}" && chmod 755 debAuto.sh && ./debAuto.sh ${packgeName}
echoColor BD "./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}
else
echo "dpkg -i ${packgeName}" && dpkg -i ${packgeName}
echoColor BD "dpkg -i ${packgeName}" && dpkg -i ${packgeName}
fi
elif [[ ${packgeName} =~ "rpm" ]];then
cd ${installPath}
echo "rpm ${packgeName}" && rpm -ivh ${packgeName} --quiet
sudo rpm -e tdengine
sudo rpm -e taostools
if [[ ${packgeName} =~ "TDengine" ]];then
echoColor BD "./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}
else
echoColor BD "rpm -ivh ${packgeName}" && rpm -ivh ${packgeName}
fi
elif [[ ${packgeName} =~ "tar" ]];then
echoColor G "===== check installPackage File of tar ====="
cd ${oriInstallPath}
if [ ! -f {originPackageName} ];then
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${originversion}/community${originPackageName} .
echoColor YD "download base installPackage"
echoColor BD "sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${originversion}/community/${originPackageName} ."
sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${originversion}/community/${originPackageName} .
fi
echo "tar -xvf ${originPackageName}" && tar -xvf ${originPackageName}
echoColor YD "unzip the base installation package"
echoColor BD "tar -xf ${originPackageName}" && tar -xf ${originPackageName}
cd ${installPath}
echo "tar -xvf ${packgeName}" && tar -xvf ${packgeName}
echoColor YD "unzip the new installation package"
echoColor BD "tar -xf ${packgeName}" && tar -xf ${packgeName}
if [ ${testFile} != "tools" ] ;then
cd ${installPath}/${tdPath} && tar vxf ${subFile}
cd ${oriInstallPath}/${originTdpPath} && tar vxf ${subFile}
cd ${installPath}/${tdPath} && tar xf ${subFile}
cd ${oriInstallPath}/${originTdpPath} && tar xf ${subFile}
fi
echo "check installPackage File"
cd ${oriInstallPath}/${originTdpPath} && tree > ${installPath}/base_${originversion}_checkfile
cd ${installPath}/${tdPath} && tree > ${installPath}/now_${version}_checkfile
cd ${installPath}
tree ${oriInstallPath}/${originTdpPath} > ${oriInstallPath}/${originPackageName}_checkfile
tree ${installPath}/${tdPath} > ${installPath}/${packgeName}_checkfile
diff ${installPath}/${packgeName}_checkfile ${oriInstallPath}/${originPackageName}_checkfile > ${installPath}/diffFile.log
diff ${installPath}/base_${originversion}_checkfile ${installPath}/now_${version}_checkfile > ${installPath}/diffFile.log
diffNumbers=`cat ${installPath}/diffFile.log |wc -l `
if [ ${diffNumbers} != 0 ];then
echo "The number and names of files have changed from the previous installation package"
echo `cat ${installPath}/diffFile.log`
exit -1
fi
if [ ${diffNumbers} != 0 ];then
echoColor R "The number and names of files is different from the previous installation package"
echoColor Y `cat ${installPath}/diffFile.log`
exit -1
else
echoColor G "The number and names of files are the same as previous installation packages"
fi
echoColor YD "===== install Package of tar ====="
cd ${installPath}/${tdPath}
if [ ${testFile} = "server" ];then
echoColor BD "bash ${installCmd} -e no "
bash ${installCmd} -e no
else
echoColor BD "bash ${installCmd} "
bash ${installCmd}
fi
if [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]] ;then
cd ${installPath}
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz .
# wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz
tar xvf taosTools-2.1.2-Linux-x64.tar.gz
cd taosTools-2.1.2 && bash install-taostools.sh
elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "deb" ]] ;then
cd ${installPath}
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.deb .
dpkg -i taosTools-2.1.2-Linux-x64.deb
elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "rpm" ]] ;then
cd ${installPath}
sshpass -p ${password} scp 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.rpm .
rpm -ivh taosTools-2.1.2-Linux-x64.rpm --quiet
fi
fi
cd ${installPath}
if ([[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]]) || [[ ${packgeName} =~ "client" ]] ;then
echoColor G "===== install taos-tools when package is lite or client ====="
cd ${installPath}
sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz .
# wget https://www.taosdata.com/assets-download/3.0/taosTools-2.1.2-Linux-x64.tar.gz
tar xf taosTools-2.1.2-Linux-x64.tar.gz
cd taosTools-2.1.2 && bash install-taostools.sh
elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "deb" ]] ;then
echoColor G "===== install taos-tools when package is lite or client ====="
cd ${installPath}
sshpass -p ${password} scp -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz .
tar xf taosTools-2.1.2-Linux-x64.tar.gz
cd taosTools-2.1.2 && bash install-taostools.sh
elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "rpm" ]] ;then
echoColor G "===== install taos-tools when package is lite or client ====="
cd ${installPath}
sshpass -p ${password} scp -oStrictHostKeyChecking=no -oStrictHostKeyChecking=no 192.168.1.131:/nas/TDengine3/v${version}/community/taosTools-2.1.2-Linux-x64.tar.gz .
tar xf taosTools-2.1.2-Linux-x64.tar.gz
cd taosTools-2.1.2 && bash install-taostools.sh
fi

View File

@ -356,6 +356,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns,
goto end;
}
pRequest->syncQuery = true;
if (!pRequest->pDb) {
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
goto end;

View File

@ -811,8 +811,19 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
}
int32_t tmq_unsubscribe(tmq_t* tmq) {
int32_t rsp;
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new();
int32_t rsp = tmq_subscribe(tmq, lst);
while (1) {
rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
tmq_list_destroy(lst);
return rsp;
}

View File

@ -5141,6 +5141,7 @@ static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeU64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeI8(pCoder, pReq->igNotExists) < 0) return -1;
tEndEncode(pCoder);
@ -5151,6 +5152,7 @@ static int32_t tDecodeSVDropTbReq(SDecoder *pCoder, SVDropTbReq *pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeU64(pCoder, &pReq->suid) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->igNotExists) < 0) return -1;
tEndDecode(pCoder);
@ -5889,6 +5891,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0;
}
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;

View File

@ -301,7 +301,7 @@ int32_t dmInitServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
strncpy(rpcInit.localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
rpcInit.localPort = tsServerPort;
rpcInit.label = "DND-S";
rpcInit.numOfThreads = tsNumOfRpcThreads;

View File

@ -763,8 +763,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
int32_t cols = 0;
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&n, varDataVal(topicName));
strcpy(varDataVal(topicName), mndGetDbStr(pTopic->name));
/*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/
/*tNameGetDbName(&n, varDataVal(topicName));*/
varDataSetLen(topicName, strlen(varDataVal(topicName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);

View File

@ -67,21 +67,21 @@ typedef struct {
// tqExec
typedef struct {
char* qmsg;
char* qmsg;
} STqExecCol;
typedef struct {
int64_t suid;
int64_t suid;
} STqExecTb;
typedef struct {
SHashObj* pFilterOutTbUid;
SHashObj* pFilterOutTbUid;
} STqExecDb;
typedef struct {
int8_t subType;
STqReader* pExecReader;
STqReader* pExecReader;
qTaskInfo_t task;
union {
STqExecCol execCol;
@ -144,7 +144,7 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
// tqExec
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp);
int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
// tqMeta
@ -175,22 +175,6 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
char* tqOffsetBuildFName(const char* path, int32_t ver);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
pOffsetVal->uid = uid;
pOffsetVal->ts = ts;
}
static FORCE_INLINE void tqOffsetResetToMeta(STqOffsetVal* pOffsetVal, int64_t uid) {
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_META;
pOffsetVal->uid = uid;
}
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
pOffsetVal->type = TMQ_OFFSET__LOG;
pOffsetVal->version = ver;
}
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);

View File

@ -16,17 +16,17 @@
#include "sma.h"
#include "tsdb.h"
static int32_t smaEvalDays(SRetention *r, int8_t precision);
static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration);
static int32_t smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type);
static int32_t rsmaRestore(SSma *pSma);
#define SMA_SET_KEEP_CFG(l) \
#define SMA_SET_KEEP_CFG(v, l) \
do { \
SRetention *r = &pCfg->retentions[l]; \
pKeepCfg->keep2 = convertTimeFromPrecisionToUnit(r->keep, pCfg->precision, TIME_UNIT_MINUTE); \
pKeepCfg->keep0 = pKeepCfg->keep2; \
pKeepCfg->keep1 = pKeepCfg->keep2; \
pKeepCfg->days = smaEvalDays(r, pCfg->precision); \
pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \
} while (0)
#define SMA_OPEN_RSMA_IMPL(v, l) \
@ -38,51 +38,78 @@ static int32_t rsmaRestore(SSma *pSma);
} \
break; \
} \
smaSetKeepCfg(&keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \
goto _err; \
} \
} while (0)
#define RETENTION_DAYS_SPLIT_RATIO 10
#define RETENTION_DAYS_SPLIT_MIN 1
#define RETENTION_DAYS_SPLIT_MAX 30
/**
* @brief Evaluate days(duration) for rsma level 1/2/3.
* 1) level 1: duration from "create database"
* 2) level 2/3: duration * (freq/freqL1)
* @param pVnode
* @param r
* @param level
* @param precision
* @param duration
* @return int32_t
*/
static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t precision, int32_t duration) {
int32_t freqDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->freq, precision, TIME_UNIT_MINUTE);
int32_t keepDuration = convertTimeFromPrecisionToUnit((r + TSDB_RETENTION_L0)->keep, precision, TIME_UNIT_MINUTE);
int32_t days = duration; // min
static int32_t smaEvalDays(SRetention *r, int8_t precision) {
int32_t keepDays = convertTimeFromPrecisionToUnit(r->keep, precision, TIME_UNIT_DAY);
int32_t freqDays = convertTimeFromPrecisionToUnit(r->freq, precision, TIME_UNIT_DAY);
int32_t days = keepDays / RETENTION_DAYS_SPLIT_RATIO;
if (days <= RETENTION_DAYS_SPLIT_MIN) {
days = RETENTION_DAYS_SPLIT_MIN;
if (days < freqDays) {
days = freqDays + 1;
}
} else {
if (days > RETENTION_DAYS_SPLIT_MAX) {
days = RETENTION_DAYS_SPLIT_MAX;
}
if (days < freqDays) {
days = freqDays + 1;
}
if (days < freqDuration) {
days = freqDuration;
}
return days * 1440;
if (days > keepDuration) {
days = keepDuration;
}
if (level == TSDB_RETENTION_L0) {
goto end;
}
ASSERT(level >= TSDB_RETENTION_L1 && level <= TSDB_RETENTION_L2);
freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE);
keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE);
int32_t nFreqTimes = (r + level)->freq / (r + TSDB_RETENTION_L0)->freq;
days *= (nFreqTimes > 1 ? nFreqTimes : 1);
if (days > keepDuration) {
days = keepDuration;
}
if (days > TSDB_MAX_DURATION_PER_FILE) {
days = TSDB_MAX_DURATION_PER_FILE;
}
if (days < freqDuration) {
days = freqDuration;
}
end:
smaInfo("vgId:%d, evaluated duration for level %" PRIi8 " is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration);
return days;
}
int smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) {
int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) {
pKeepCfg->precision = pCfg->precision;
switch (type) {
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
SMA_SET_KEEP_CFG(0);
SMA_SET_KEEP_CFG(pVnode, 0);
break;
case TSDB_TYPE_RSMA_L1:
SMA_SET_KEEP_CFG(1);
SMA_SET_KEEP_CFG(pVnode, 1);
break;
case TSDB_TYPE_RSMA_L2:
SMA_SET_KEEP_CFG(2);
SMA_SET_KEEP_CFG(pVnode, 2);
break;
default:
ASSERT(0);
@ -148,11 +175,11 @@ int32_t smaClose(SSma *pSma) {
/**
* @brief rsma env restore
*
* @param pSma
* @param type
* @param committedVer
* @return int32_t
*
* @param pSma
* @param type
* @param committedVer
* @return int32_t
*/
int32_t tdRsmaRestore(SSma *pSma, int8_t type, int64_t committedVer) {
ASSERT(VND_IS_RSMA(pSma->pVnode));

View File

@ -1078,9 +1078,6 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTa
goto _err;
}
SSmaEnv *pRSmaEnv = pSma->pRSmaEnv;
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv);
SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter);

View File

@ -357,8 +357,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
TD_VID(pTq->pVnode), formatBuf);
} else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot){
if (pHandle->fetchMeta){
if (pReq->useSnapshot) {
if (pHandle->fetchMeta) {
tqOffsetResetToMeta(&fetchOffsetNew, 0);
} else {
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
@ -373,43 +373,47 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
tDeleteSMqDataRsp(&dataRsp);
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64
" in vg %d, subkey %s, reset none failed",
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey);
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
code = -1;
goto OVER;
tDeleteSMqDataRsp(&dataRsp);
return code;
}
}
}
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG){
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN || fetchOffsetNew.type != TMQ_OFFSET__LOG) {
SMqMetaRsp metaRsp = {0};
tqScan(pTq, pHandle, &dataRsp, &metaRsp, &fetchOffsetNew);
if(metaRsp.metaRspLen > 0){
if (metaRsp.metaRspLen > 0) {
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
code = -1;
}
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version);
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send meta offset type:%d,uid:%ld,version:%ld", consumerId,
pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
metaRsp.rspOffset.version);
taosMemoryFree(metaRsp.metaRsp);
goto OVER;
}
if (dataRsp.blockNum > 0){
if (dataRsp.blockNum > 0) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
goto OVER;
}else{
} else {
fetchOffsetNew = dataRsp.rspOffset;
}
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%ld,version:%ld",
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
dataRsp.rspOffset.uid, dataRsp.rspOffset.version);
}
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN && fetchOffsetNew.type == TMQ_OFFSET__LOG) {
@ -426,7 +430,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
consumerEpoch = atomic_load_32(&pHandle->epoch);
if (consumerEpoch > reqEpoch) {
tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
", found new consumer epoch %d, discard req epoch %d",
consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
break;
}
@ -449,7 +453,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
if (tqLogScanExec(pTq, &pHandle->execHandle, pCont, &dataRsp) < 0) {
if (tqLogScanExec(pTq, pHandle, pCont, &dataRsp) < 0) {
/*ASSERT(0);*/
}
// TODO batch optimization:
@ -490,18 +494,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
OVER:
if (pCkHead) taosMemoryFree(pCkHead);
// TODO wrap in destroy func
taosArrayDestroy(dataRsp.blockDataLen);
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
if (dataRsp.withSchema) {
taosArrayDestroyP(dataRsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
}
if (dataRsp.withTbName) {
taosArrayDestroyP(dataRsp.blockTbName, (FDelete)taosMemoryFree);
}
tDeleteSMqDataRsp(&dataRsp);
return code;
}
@ -629,9 +622,9 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList);
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext **)(&handle.sContext));
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
}
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId);

View File

@ -60,6 +60,46 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
return 0;
}
int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->task;
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
if (pOffset->type == TMQ_OFFSET__LOG) {
pRsp->rspOffset = *pOffset;
return 0;
} else {
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) {
tqDebug("prepare scan failed, return");
pRsp->rspOffset = *pOffset;
return 0;
}
}
}
int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("tmq task start to execute");
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
tqDebug("tmq task execute end, get %p", pDataBlock);
if (pDataBlock) {
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
pRsp->blockNum++;
}
}
return 0;
}
int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* pOffset) {
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
@ -102,18 +142,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
taosArrayPush(pRsp->blockTbName, &tbName);
}
}
if(pRsp->withSchema){
if (pRsp->withSchema) {
if (pOffset->type == TMQ_OFFSET__LOG) {
tqAddBlockSchemaToRsp(pExec, pRsp);
}else{
} else {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
taosArrayPush(pRsp->blockSchema, &pSW);
}
}
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
}else{
} else {
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
}
pRsp->blockNum++;
@ -125,17 +165,9 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
}
}
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqDebug("vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
pHandle->snapshotVer + 1);
tqOffsetResetToLog(pOffset, pHandle->snapshotVer);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
continue;
}
}else{
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
if(qStreamExtractPrepareUid(task) != 0){
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
if (pDataBlock == NULL && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
if (qStreamExtractPrepareUid(task) != 0) {
continue;
}
tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode),
@ -143,13 +175,13 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
break;
}
if (pRsp->blockNum > 0){
if (pRsp->blockNum > 0) {
tqDebug("tmqsnap task exec exited, get data");
break;
}
SMqMetaRsp* tmp = qStreamExtractMetaMsg(task);
if(tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA){
if (tmp->rspOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
tqOffsetResetToData(pOffset, tmp->rspOffset.uid, tmp->rspOffset.ts);
qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
tmp->rspOffset.type = TMQ_OFFSET__SNAPSHOT_META;
@ -173,57 +205,8 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
return 0;
}
#if 0
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId];
if (qStreamPrepareTsdbScan(task, offset.uid, offset.ts) < 0) {
ASSERT(0);
}
int32_t rowCnt = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(taosArrayGetSize(pDataBlock->pDataBlock) != 0);
tqAddBlockDataToRsp(pDataBlock, pRsp);
if (pRsp->withTbName) {
pRsp->withTbName = 0;
#if 0
int64_t uid;
int64_t ts;
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqAddTbNameToRsp(pTq, uid, pRsp);
#endif
}
pRsp->blockNum++;
rowCnt += pDataBlock->info.rows;
if (rowCnt >= 4096) break;
}
int64_t uid;
int64_t ts;
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
ASSERT(0);
}
tqOffsetResetToData(&pRsp->rspOffset, uid, ts);
return 0;
}
#endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp) {
int32_t tqLogScanExec(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, SMqDataRsp* pRsp) {
STqExecHandle* pExec = &pHandle->execHandle;
ASSERT(pExec->subType != TOPIC_SUB_TYPE__COLUMN);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
@ -268,6 +251,28 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
tqAddBlockSchemaToRsp(pExec, pRsp);
pRsp->blockNum++;
}
#if 0
if (pHandle->fetchMeta && pRsp->blockNum) {
SSubmitMsgIter iter = {0};
tInitSubmitMsgIter(pReq, &iter);
STaosxRsp* pXrsp = (STaosxRsp*)pRsp;
while (1) {
SSubmitBlk* pBlk = NULL;
if (tGetSubmitMsgNext(&iter, &pBlk) < 0) return -1;
if (pBlk->schemaLen > 0) {
if (pXrsp->createTableNum == 0) {
pXrsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pXrsp->createTableReq = taosArrayInit(0, sizeof(void*));
}
void* createReq = taosMemoryCalloc(1, pBlk->schemaLen);
memcpy(createReq, pBlk->data, pBlk->schemaLen);
taosArrayPush(pXrsp->createTableLen, &pBlk->schemaLen);
taosArrayPush(pXrsp->createTableReq, &createReq);
pXrsp->createTableNum++;
}
}
}
#endif
}
if (pRsp->blockNum == 0) {

View File

@ -18,12 +18,25 @@
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1;
if (tEncodeI8(pEncoder, pHandle->fetchMeta) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->consumerId) < 0) return -1;
if (tEncodeI64(pEncoder, pHandle->snapshotVer) < 0) return -1;
if (tEncodeI32(pEncoder, pHandle->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pHandle->execHandle.execCol.qmsg) < 0) return -1;
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
int32_t size = taosHashGetSize(pHandle->execHandle.execDb.pFilterOutTbUid);
if (tEncodeI32(pEncoder, size) < 0) return -1;
void *pIter = NULL;
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
while(pIter){
int64_t *tbUid = (int64_t *)taosHashGetKey(pIter, NULL);
if (tEncodeI64(pEncoder, *tbUid) < 0) return -1;
pIter = taosHashIterate(pHandle->execHandle.execDb.pFilterOutTbUid, pIter);
}
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
if (tEncodeI64(pEncoder, pHandle->execHandle.execTb.suid) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
@ -32,12 +45,25 @@ int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) {
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pHandle->subKey) < 0) return -1;
if (tDecodeI8(pDecoder, &pHandle->fetchMeta) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->consumerId) < 0) return -1;
if (tDecodeI64(pDecoder, &pHandle->snapshotVer) < 0) return -1;
if (tDecodeI32(pDecoder, &pHandle->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pHandle->execHandle.subType) < 0) return -1;
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1;
}else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB){
pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
for(int32_t i = 0; i < size; i++){
int64_t tbUid = 0;
if (tDecodeI64(pDecoder, &tbUid) < 0) return -1;
taosHashPut(pHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
} else if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE){
if (tDecodeI64(pDecoder, &pHandle->execHandle.execTb.suid) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
@ -267,14 +293,28 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
ASSERT(scanner);
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader);
} else {
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
// handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
handle.execHandle.task =
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, handle.execHandle.execTb.suid, tbUidList);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
}
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList);
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext **)(&reader.sContext));
handle.execHandle.task =
qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL);
}

View File

@ -15,6 +15,162 @@
#include "tq.h"
bool isValValidForTable(STqHandle* pHandle, SWalCont *pHead){
if(pHandle->execHandle.subType != TOPIC_SUB_TYPE__TABLE){
return true;
}
int16_t msgType = pHead->msgType;
char* body = pHead->body;
int32_t bodyLen = pHead->bodyLen;
int64_t tbSuid = pHandle->execHandle.execTb.suid;
int64_t realTbSuid = 0;
SDecoder coder;
void* data = POINTER_SHIFT(body, sizeof(SMsgHead));
int32_t len = bodyLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (msgType == TDMT_VND_CREATE_STB || msgType == TDMT_VND_ALTER_STB) {
SVCreateStbReq req = {0};
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto end;
}
realTbSuid = req.suid;
} else if (msgType == TDMT_VND_DROP_STB) {
SVDropStbReq req = {0};
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
goto end;
}
realTbSuid = req.suid;
} else if (msgType == TDMT_VND_CREATE_TABLE) {
SVCreateTbBatchReq req = {0};
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
goto end;
}
int32_t needRebuild = 0;
SVCreateTbReq* pCreateReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
needRebuild++;
}
}
if(needRebuild == 0){
// do nothing
}else if(needRebuild == req.nReqs){
realTbSuid = tbSuid;
}else{
realTbSuid = tbSuid;
SVCreateTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(struct SVCreateTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
if(pCreateReq->type == TSDB_CHILD_TABLE && pCreateReq->ctb.suid == tbSuid){
reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pCreateReq);
}
}
int tlen;
int32_t ret = 0;
tEncodeSize(tEncodeSVCreateTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) {
taosArrayDestroy(reqNew.pArray);
goto end;
}
SEncoder coderNew = {0};
tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
tEncodeSVCreateTbBatchReq(&coderNew, &reqNew);
tEncoderClear(&coderNew);
memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
pHead->bodyLen = tlen + sizeof(SMsgHead);
taosMemoryFree(buf);
taosArrayDestroy(reqNew.pArray);
}
} else if (msgType == TDMT_VND_ALTER_TABLE) {
SVAlterTbReq req = {0};
if (tDecodeSVAlterTbReq(&coder, &req) < 0) {
goto end;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->execHandle.pExecReader->pVnodeMeta, 0);
if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
metaReaderClear(&mr);
goto end;
}
realTbSuid = mr.me.ctbEntry.suid;
metaReaderClear(&mr);
} else if (msgType == TDMT_VND_DROP_TABLE) {
SVDropTbBatchReq req = {0};
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
goto end;
}
int32_t needRebuild = 0;
SVDropTbReq* pDropReq = NULL;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){
needRebuild++;
}
}
if(needRebuild == 0){
// do nothing
}else if(needRebuild == req.nReqs){
realTbSuid = tbSuid;
}else{
realTbSuid = tbSuid;
SVDropTbBatchReq reqNew = {0};
reqNew.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbReq));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pDropReq = req.pReqs + iReq;
if(pDropReq->suid == tbSuid){
reqNew.nReqs++;
taosArrayPush(reqNew.pArray, pDropReq);
}
}
int tlen;
int32_t ret = 0;
tEncodeSize(tEncodeSVDropTbBatchReq, &reqNew, tlen, ret);
void* buf = taosMemoryMalloc(tlen);
if (NULL == buf) {
taosArrayDestroy(reqNew.pArray);
goto end;
}
SEncoder coderNew = {0};
tEncoderInit(&coderNew, buf, tlen - sizeof(SMsgHead));
tEncodeSVDropTbBatchReq(&coderNew, &reqNew);
tEncoderClear(&coderNew);
memcpy(pHead->body + sizeof(SMsgHead), buf, tlen);
pHead->bodyLen = tlen + sizeof(SMsgHead);
taosMemoryFree(buf);
taosArrayDestroy(reqNew.pArray);
}
} else if (msgType == TDMT_VND_DELETE) {
SDeleteRes req = {0};
if (tDecodeDeleteRes(&coder, &req) < 0) {
goto end;
}
realTbSuid = req.suid;
} else{
ASSERT(0);
}
end:
tDecoderClear(&coder);
return tbSuid == realTbSuid;
}
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
int32_t code = 0;
taosThreadMutexLock(&pHandle->pWalReader->mutex);
@ -53,9 +209,11 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = -1;
goto END;
}
*fetchOffset = offset;
code = 0;
goto END;
if(isValValidForTable(pHandle, pHead)){
*fetchOffset = offset;
code = 0;
goto END;
}
}
}
code = walSkipFetchBody(pHandle->pWalReader, *ppCkHead);

View File

@ -143,6 +143,8 @@ typedef struct {
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int64_t snapshotVer;
SSchemaWrapper *schema;
char tbName[TSDB_TABLE_NAME_LEN];
SSDataBlock* pullOverBlk; // for streaming
@ -486,24 +488,23 @@ typedef struct SStreamScanInfo {
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
// status for tmq
// SSchemaWrapper schema;
SNodeList* pGroupTags;
SNode* pTagCond;
SNode* pTagIndexCond;
SNodeList* pGroupTags;
SNode* pTagCond;
SNode* pTagIndexCond;
} SStreamScanInfo;
typedef struct SStreamRawScanInfo{
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
SSnapContext* sContext;
}SStreamRawScanInfo;
typedef struct {
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
SSnapContext* sContext;
} SStreamRawScanInfo;
typedef struct SSysTableScanInfo {
SRetrieveMetaTableRsp* pRsp;
@ -528,14 +529,14 @@ typedef struct SBlockDistInfo {
SSDataBlock* pResBlock;
void* pHandle;
SReadHandle readHandle;
uint64_t uid; // table uid
uint64_t uid; // table uid
} SBlockDistInfo;
// todo remove this
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
bool mergeResultBlock;
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
bool mergeResultBlock;
} SOptrBasicInfo;
typedef struct SIntervalAggOperatorInfo {

View File

@ -1190,7 +1190,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
} else {
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
strncpy(pCtx->udfName, udfName, strlen(udfName));
strncpy(pCtx->udfName, udfName, TSDB_FUNC_NAME_LEN);
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
}
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);

View File

@ -139,7 +139,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) {
// TODO create raw scan
// create raw scan
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (NULL == pTaskInfo) {
@ -151,7 +151,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo);
if(NULL == pTaskInfo->pRoot){
if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo);
return NULL;
@ -834,11 +834,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} else {
ASSERT(0);
}
}else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA){
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
if(setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%"PRIi64, pOffset->uid);
SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
return -1;
}
@ -847,27 +847,29 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
if(mtInfo.uid == 0) return 0; // no data
if (mtInfo.uid == 0) return 0; // no data
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList,
&pInfo->dataReader, NULL);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts);
}else if(pOffset->type == TMQ_OFFSET__SNAPSHOT_META){
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
if(setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%"PRIi64" ,version:%"PRIi64, pOffset->uid);
SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setForSnapShot error. uid:%" PRIi64 " ,version:%" PRIi64, pOffset->uid);
return -1;
}
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid %ld ts %ld", pOffset->uid);
}else if (pOffset->type == TMQ_OFFSET__LOG) {
} else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info;
tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;

View File

@ -268,7 +268,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
sizeof(SResultRowPosition));
sizeof(SResultRowPosition));
}
// 2. set the new time window to be the new active time window
@ -2815,92 +2815,6 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
}
#if 0
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
uint8_t type = pOperator->operatorType;
pOperator->status = OP_OPENED;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
pScanInfo->blockType = STREAM_INPUT__TABLE_SCAN;
pScanInfo->pTableScanOp->status = OP_OPENED;
STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
if (uid == 0) {
pInfo->noTable = 1;
return TSDB_CODE_SUCCESS;
}
/*if (pSnapShotScanInfo->dataReader == NULL) {*/
/*pSnapShotScanInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);*/
/*pSnapShotScanInfo->scanMode = TABLE_SCAN__TABLE_ORDER;*/
/*}*/
pInfo->noTable = 0;
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
bool found = false;
for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) {
found = true;
pInfo->currentTable = i;
}
}
// TODO after processing drop, found can be false
ASSERT(found);
tsdbSetTableId(pInfo->dataReader, uid);
int64_t oldSkey = pInfo->cond.twindows.skey;
pInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->cond.twindows.skey = oldSkey;
pInfo->scanTimes = 0;
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
pInfo->currentTable, tableSz);
}
return TSDB_CODE_SUCCESS;
} else {
if (pOperator->numOfDownstream == 1) {
return doPrepareScan(pOperator->pDownstream[0], uid, ts);
} else if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block");
return TSDB_CODE_QRY_APP_ERROR;
} else {
qError("join not supported for stream block scan");
return TSDB_CODE_QRY_APP_ERROR;
}
}
}
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
*uid = pSnapShotScanInfo->lastStatus.uid;
*ts = pSnapShotScanInfo->lastStatus.ts;
} else {
if (pOperator->pDownstream[0] == NULL) {
return TSDB_CODE_INVALID_PARA;
} else {
doGetScanStatus(pOperator->pDownstream[0], uid, ts);
}
}
return TSDB_CODE_SUCCESS;
}
#endif
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
@ -3024,7 +2938,7 @@ int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* len
SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset);
setBufPageDirty(pPage, true);
releaseBufPage(pSup->pResultBuf, pPage);
int32_t iter = 0;
void* pIter = NULL;
while ((pIter = tSimpleHashIterate(pSup->pResultRowHashTable, pIter, &iter))) {
@ -3434,7 +3348,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) {
int32_t code = 0;
int32_t code = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1;
@ -4068,8 +3982,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i);
qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid);
}
}
#endif
}
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
@ -4294,42 +4208,6 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
return pList;
}
#if 0
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
STableListInfo* pTableListInfo, const char* idstr) {
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
code = 0;
qDebug("no table qualified for query, %s", idstr);
goto _error;
}
SQueryTableDataCond cond = {0};
code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
STsdbReader* pReader;
code = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, idstr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
cleanupQueryTableDataCond(&cond);
return pReader;
_error:
terrno = code;
return NULL;
}
#endif
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
@ -4571,7 +4449,7 @@ _complete:
return code;
}
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);
if (pTableqinfoList->needSortTableByGroupId) {

View File

@ -1219,7 +1219,7 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
@ -1228,7 +1228,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
pInfo->pRes->info.type = STREAM_NORMAL;
pInfo->pRes->info.version = pBlock->info.version;
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
if (groupIdPre) {
pInfo->pRes->info.groupId = *groupIdPre;
} else {
@ -1276,11 +1276,80 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
return 0;
}
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("queue scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows", pResult->info.rows);
return pResult;
} else {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->dataReader);
pTSInfo->dataReader = NULL;
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
return NULL;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
}
}
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
SFetchRet ret = {0};
tqNextBlock(pInfo->tqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes);
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0);
}
// TODO clean data block
if (pInfo->pRes->info.rows > 0) {
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
} else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0);
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("queue scan log return null, offset %s", formatBuf);
return NULL;
} else {
ASSERT(0);
}
}
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
return pResult;
}
qDebug("stream scan tsdb return null");
return NULL;
} else {
ASSERT(0);
return NULL;
}
}
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
qDebug("stream scan called");
#if 0
SStreamState* pState = pTaskInfo->streamInfo.pState;
if (pState) {
@ -1317,48 +1386,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
}
#endif
qDebug("stream scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
SFetchRet ret = {0};
tqNextBlock(pInfo->tqReader, &ret);
if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes);
if (setBlockIntoRes(pInfo, &ret.data) < 0) {
ASSERT(0);
}
// TODO clean data block
if (pInfo->pRes->info.rows > 0) {
qDebug("stream scan log return %d rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
} else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0);
// pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL;
} else if (ret.fetchType == FETCH_TYPE__NONE) {
pTaskInfo->streamInfo.lastStatus = ret.offset;
ASSERT(pTaskInfo->streamInfo.lastStatus.version >= pTaskInfo->streamInfo.prepareStatus.version);
ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion);
char formatBuf[80];
tFormatOffset(formatBuf, 80, &ret.offset);
qDebug("stream scan log return null, offset %s", formatBuf);
return NULL;
} else {
ASSERT(0);
}
}
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("stream scan tsdb return %d rows", pResult->info.rows);
return pResult;
}
qDebug("stream scan tsdb return null");
return NULL;
}
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond));
@ -1554,14 +1581,14 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
}
static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamRawScanInfo* pInfo = pOperator->info;
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
qDebug("tmqsnap doRawScan called");
if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA){
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pBlock = &pInfo->pRes;
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
@ -1585,42 +1612,38 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
}
SMetaTableInfo mtInfo = getUidfromSnapShot(pInfo->sContext);
if (mtInfo.uid == 0){ //read snapshot done, change to get data from wal
if (mtInfo.uid == 0) { // read snapshot done, change to get data from wal
qDebug("tmqsnap read snapshot done, change to get data from wal");
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
pTaskInfo->streamInfo.lastStatus.version = pInfo->sContext->snapVersion;
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
}else{
} else {
pTaskInfo->streamInfo.prepareStatus.uid = mtInfo.uid;
pTaskInfo->streamInfo.prepareStatus.ts = INT64_MIN;
qDebug("tmqsnap change get data uid:%ld", mtInfo.uid);
qStreamPrepareScan(pTaskInfo, &pTaskInfo->streamInfo.prepareStatus, pInfo->sContext->subType);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
}
qDebug("tmqsnap stream scan tsdb return null");
return NULL;
}else if(pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META){
SSnapContext *sContext = pInfo->sContext;
void* data = NULL;
int32_t dataLen = 0;
int16_t type = 0;
int64_t uid = 0;
if(getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0){
} else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_META) {
SSnapContext* sContext = pInfo->sContext;
void* data = NULL;
int32_t dataLen = 0;
int16_t type = 0;
int64_t uid = 0;
if (getMetafromSnapShot(sContext, &data, &dataLen, &type, &uid) < 0) {
qError("tmqsnap getMetafromSnapShot error");
taosMemoryFreeClear(data);
return NULL;
}
if(!sContext->queryMetaOrData){ // change to get data next poll request
if (!sContext->queryMetaOrData) { // change to get data next poll request
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__SNAPSHOT_DATA;
pTaskInfo->streamInfo.metaRsp.rspOffset.uid = 0;
pTaskInfo->streamInfo.metaRsp.rspOffset.ts = INT64_MIN;
}else{
} else {
pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__SNAPSHOT_META;
pTaskInfo->streamInfo.lastStatus.uid = uid;
pTaskInfo->streamInfo.metaRsp.rspOffset = pTaskInfo->streamInfo.lastStatus;
@ -1631,44 +1654,44 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
return NULL;
}
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
//
// while(1){
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// return NULL;
// }
// SWalCont* pHead = &pInfo->pCkHead->head;
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
//
// if (pHead->msgType == TDMT_VND_SUBMIT) {
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid, &pInfo->pRes);
// if(block){
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// return block;
// }else{
// fetchVer++;
// }
// } else{
// ASSERT(pInfo->sContext->withMeta);
// ASSERT(IS_META_MSG(pHead->msgType));
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
// return NULL;
// }
// }
// else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
// int64_t fetchVer = pTaskInfo->streamInfo.prepareStatus.version + 1;
//
// while(1){
// if (tqFetchLog(pInfo->tqReader->pWalReader, pInfo->sContext->withMeta, &fetchVer, &pInfo->pCkHead) < 0) {
// qDebug("tmqsnap tmq poll: consumer log end. offset %" PRId64, fetchVer);
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// return NULL;
// }
// SWalCont* pHead = &pInfo->pCkHead->head;
// qDebug("tmqsnap tmq poll: consumer log offset %" PRId64 " msgType %d", fetchVer, pHead->msgType);
//
// if (pHead->msgType == TDMT_VND_SUBMIT) {
// SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
// tqReaderSetDataMsg(pInfo->tqReader, pCont, 0);
// SSDataBlock* block = tqLogScanExec(pInfo->sContext->subType, pInfo->tqReader, pInfo->pFilterOutTbUid,
// &pInfo->pRes); if(block){
// pTaskInfo->streamInfo.lastStatus.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.lastStatus.version = fetchVer;
// qDebug("tmqsnap fetch data msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// return block;
// }else{
// fetchVer++;
// }
// } else{
// ASSERT(pInfo->sContext->withMeta);
// ASSERT(IS_META_MSG(pHead->msgType));
// qDebug("tmqsnap fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType);
// pTaskInfo->streamInfo.metaRsp.rspOffset.version = fetchVer;
// pTaskInfo->streamInfo.metaRsp.rspOffset.type = TMQ_OFFSET__LOG;
// pTaskInfo->streamInfo.metaRsp.resMsgType = pHead->msgType;
// pTaskInfo->streamInfo.metaRsp.metaRspLen = pHead->bodyLen;
// pTaskInfo->streamInfo.metaRsp.metaRsp = taosMemoryMalloc(pHead->bodyLen);
// memcpy(pTaskInfo->streamInfo.metaRsp.metaRsp, pHead->body, pHead->bodyLen);
// return NULL;
// }
// }
return NULL;
}
@ -1689,7 +1712,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
// create tq reader
SStreamRawScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamRawScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
@ -1699,13 +1722,12 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->sContext = pHandle->sContext;
pOperator->name = "RawStreamScanOperator";
// pOperator->blocking = false;
// pOperator->status = OP_NOT_OPENED;
// pOperator->blocking = false;
// pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo,
NULL, NULL, NULL);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
return pOperator;
}
@ -1724,7 +1746,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
}
if (pStreamScan->pPseudoExpr) {
destroyExprInfo(pStreamScan->pPseudoExpr, pStreamScan->numOfPseudoExpr);
taosMemoryFreeClear(pStreamScan->pPseudoExpr);
taosMemoryFree(pStreamScan->pPseudoExpr);
}
updateInfoDestroy(pStreamScan->pUpdateInfo);
@ -1815,6 +1837,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid;
pTaskInfo->streamInfo.snapshotVer = pHandle->version;
// set the extract column id to streamHandle
tqReaderSetColIdList(pInfo->tqReader, pColIds);
@ -1858,8 +1881,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, destroyStreamScanOperatorInfo,
NULL, NULL, NULL);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL);
return pOperator;

View File

@ -1160,6 +1160,7 @@ static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, c
}
ASSERT(0);
return 0;
}
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {

View File

@ -81,7 +81,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) {
taosDirName(path);
#endif
} else {
strncpy(path, tsProcPath, strlen(tsProcPath));
strncpy(path, tsProcPath, PATH_MAX);
taosDirName(path);
}
#ifdef WINDOWS

View File

@ -453,7 +453,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto _return;
}
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
strncpy(udf->path, path, PATH_MAX);
tFreeSFuncInfo(pFuncInfo);
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
@ -566,17 +566,17 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc));
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *startSuffix = "_start";
strncpy(startFuncName, processFuncName, strlen(processFuncName));
strncpy(startFuncName, processFuncName, sizeof(startFuncName));
strncat(startFuncName, startSuffix, strlen(startSuffix));
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(finishFuncName, processFuncName, strlen(processFuncName));
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggMergeFunc));
}

View File

@ -171,6 +171,7 @@ TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t d
return tDoCompare(func, cmptype, &va, &vb);
}
assert(0);
return BREAK;
#endif
}
TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {

View File

@ -6408,8 +6408,8 @@ typedef struct SVgroupDropTableBatch {
char dbName[TSDB_DB_NAME_LEN];
} SVgroupDropTableBatch;
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo) {
SVDropTbReq req = {.name = pClause->tableName, .igNotExists = pClause->ignoreNotExists};
static void addDropTbReqIntoVgroup(SHashObj* pVgroupHashmap, SDropTableClause* pClause, SVgroupInfo* pVgInfo, uint64_t suid) {
SVDropTbReq req = {.name = pClause->tableName, .suid = suid, .igNotExists = pClause->ignoreNotExists};
SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
if (NULL == pTableBatch) {
SVgroupDropTableBatch tBatch = {0};
@ -6450,7 +6450,7 @@ static int32_t buildDropTableVgroupHashmap(STranslateContext* pCxt, SDropTableCl
code = getTableHashVgroup(pCxt, pClause->dbName, pClause->tableName, &info);
}
if (TSDB_CODE_SUCCESS == code) {
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info);
addDropTbReqIntoVgroup(pVgroupHashmap, pClause, &info, pTableMeta->suid);
}
over:

View File

@ -163,6 +163,7 @@ int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pR
}
}
ASSERT(0);
return -1;
}
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
@ -190,6 +191,7 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa
}
}
ASSERT(0);
return -1;
}
// for debug -------------------
@ -245,4 +247,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI
}
}
ASSERT(0);
return -1;
}

View File

@ -43,7 +43,7 @@ void* rpcOpen(const SRpcInit* pInit) {
return NULL;
}
if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1);
tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN);
}
// register callback handle
pRpc->cfp = pInit->cfp;

View File

@ -372,7 +372,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
int64_t code;
ASSERT(pRead->curVersion == pHead->head.version);
// ASSERT(pRead->curVersion == pHead->head.version);
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
if (code < 0) {

View File

@ -201,6 +201,7 @@ void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size
return (c > 0) ? p : (midx > 0 ? p - size : NULL);
} else {
ASSERT(0);
return NULL;
}
}

View File

@ -244,6 +244,7 @@ int32_t compareJsonVal(const void *pLeft, const void *pRight) {
return 0;
}else{
assert(0);
return 0;
}
}

View File

@ -446,7 +446,10 @@ static inline void taosPrintLogImp(ELogLevel level, int32_t dflag, const char *b
}
if (dflag & DEBUG_SCREEN) {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
write(1, buffer, (uint32_t)len);
#pragma GCC diagnostic pop
}
}

View File

@ -20,6 +20,9 @@
#include "tconfig.h"
#include "tjson.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
#define TMP_DNODE_DIR TD_TMP_DIR_PATH "dumpsdb"
#define TMP_MNODE_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode"
#define TMP_SDB_DATA_DIR TD_TMP_DIR_PATH "dumpsdb" TD_DIRSEP "mnode" TD_DIRSEP "data"
@ -429,6 +432,7 @@ int32_t parseArgs(int32_t argc, char *argv[]) {
char cmd[PATH_MAX * 2] = {0};
snprintf(cmd, sizeof(cmd), "rm -rf %s", TMP_DNODE_DIR);
system(cmd);
#ifdef WINDOWS
taosMulMkDir(TMP_SDB_DATA_DIR);
@ -467,3 +471,5 @@ int32_t main(int32_t argc, char *argv[]) {
return dumpSdb();
}
#pragma GCC diagnostic pop

View File

@ -26,6 +26,7 @@ TdFilePtr g_fp = NULL;
typedef struct{
bool snapShot;
bool dropTable;
bool subTable;
int srcVgroups;
int dstVgroups;
char dir[64];
@ -74,57 +75,7 @@ static void msg_process(TAOS_RES* msg) {
taos_close(pConn);
}
int32_t init_env(Config *conf) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
char sql[128] = {0};
snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", conf->dstVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
snprintf(sql, 128, "create database if not exists abc1 vgroups %d", conf->srcVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
int buildDatabase(TAOS* pConn, TAOS_RES* pRes){
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
@ -232,7 +183,7 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
if(conf->dropTable){
if(g_conf.dropTable){
pRes = taos_query(pConn, "drop table ct3 ct1");
if (taos_errno(pRes) != 0) {
printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
@ -297,7 +248,7 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
if(conf->dropTable){
if(g_conf.dropTable){
pRes = taos_query(pConn, "drop table n1");
if (taos_errno(pRes) != 0) {
printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
@ -341,7 +292,7 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
if(conf->dropTable){
if(g_conf.dropTable){
pRes = taos_query(pConn,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
"nchar(8), t4 bool)");
@ -358,6 +309,112 @@ int32_t init_env(Config *conf) {
}
taos_free_result(pRes);
}
return 0;
}
int buildStable(TAOS* pConn, TAOS_RES* pRes){
pRes = taos_query(pConn, "CREATE STABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` VARCHAR(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1 using meters tags(2, 'Beijing')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stream meters_summary_s into meters_summary as select _wstart, max(current) as current, groupid, location from meters partition by groupid, location interval(10m)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters_summary, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
char sql[128] = {0};
snprintf(sql, 128, "create database if not exists db_taosx vgroups %d", g_conf.dstVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists topic_db");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop topic if exists meters_summary_t1");
if (taos_errno(pRes) != 0) {
printf("error in drop topic, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists abc1");
if (taos_errno(pRes) != 0) {
printf("error in drop db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
snprintf(sql, 128, "create database if not exists abc1 vgroups %d", g_conf.srcVgroups);
pRes = taos_query(pConn, sql);
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
if(g_conf.subTable){
buildStable(pConn, pRes);
}else{
buildDatabase(pConn, pRes);
}
taos_close(pConn);
return 0;
}
@ -377,12 +434,21 @@ int32_t create_topic() {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_db with meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
return -1;
if(g_conf.subTable){
pRes = taos_query(pConn, "create topic meters_summary_t1 with meta as stable meters_summary");
if (taos_errno(pRes) != 0) {
printf("failed to create topic meters_summary_t1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}else{
pRes = taos_query(pConn, "create topic topic_db with meta as database abc1");
if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_db, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
}
taos_free_result(pRes);
taos_close(pConn);
return 0;
@ -392,7 +458,7 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer(Config *config) {
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
@ -402,7 +468,7 @@ tmq_t* build_consumer(Config *config) {
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.heartbeat.background", "true");
if(config->snapShot){
if(g_conf.snapShot){
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
}
@ -415,7 +481,11 @@ tmq_t* build_consumer(Config *config) {
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_db");
if(g_conf.subTable){
tmq_list_append(topic_list, "meters_summary_t1");
}else{
tmq_list_append(topic_list, "topic_db");
}
return topic_list;
}
@ -446,16 +516,16 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf(stderr, "%% Consumer closed\n");
}
void initLogFile(Config *conf) {
void initLogFile() {
char f1[256] = {0};
char f2[256] = {0};
if(conf->snapShot){
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", conf->dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", conf->dir);
if(g_conf.snapShot){
sprintf(f1, "%s/../log/tmq_taosx_tmp_snapshot.source", g_conf.dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp_snapshot.result", g_conf.dir);
}else{
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", conf->dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", conf->dir);
sprintf(f1, "%s/../log/tmq_taosx_tmp.source", g_conf.dir);
sprintf(f2, "%s/../log/tmq_taosx_tmp.result", g_conf.dir);
}
TdFilePtr pFile = taosOpenFile(f1, TD_FILE_TEXT | TD_FILE_TRUNC | TD_FILE_STREAM);
@ -471,7 +541,7 @@ void initLogFile(Config *conf) {
exit(-1);
}
if(conf->snapShot){
if(g_conf.snapShot){
char *result[] = {
"{\"type\":\"create\",\"tableName\":\"st1\",\"tableType\":\"super\",\"columns\":[{\"name\":\"ts\",\"type\":9},{\"name\":\"c1\",\"type\":4},{\"name\":\"c2\",\"type\":6},{\"name\":\"c3\",\"type\":8,\"length\":64},{\"name\":\"c4\",\"type\":5}],\"tags\":[{\"name\":\"t1\",\"type\":4},{\"name\":\"t3\",\"type\":10,\"length\":8},{\"name\":\"t4\",\"type\":1},{\"name\":\"t2\",\"type\":8,\"length\":64}]}",
"{\"type\":\"create\",\"tableName\":\"ct0\",\"tableType\":\"child\",\"using\":\"st1\",\"tagNum\":4,\"tags\":[{\"name\":\"t1\",\"type\":4,\"value\":1000},{\"name\":\"t3\",\"type\":10,\"value\":\"\\\"ttt\\\"\"},{\"name\":\"t4\",\"type\":1,\"value\":1}]}",
@ -531,20 +601,22 @@ int main(int argc, char* argv[]) {
g_conf.srcVgroups = atol(argv[++i]);
}else if(strcmp(argv[i], "-dv") == 0){
g_conf.dstVgroups = atol(argv[++i]);
}else if(strcmp(argv[i], "-t") == 0){
g_conf.subTable = true;
}
}
printf("env init\n");
if(strlen(g_conf.dir) != 0){
initLogFile(&g_conf);
initLogFile();
}
if (init_env(&g_conf) < 0) {
if (init_env() < 0) {
return -1;
}
create_topic();
tmq_t* tmq = build_consumer(&g_conf);
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
taosCloseFile(&g_fp);

View File

@ -464,7 +464,10 @@ void simStoreSystemContentResult(SScript *script, char *filename) {
taosCloseFile(&pFile);
char rmCmd[MAX_FILE_NAME_LEN] = {0};
sprintf(rmCmd, "rm -f %s", filename);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
system(rmCmd);
#pragma GCC diagnostic pop
}
}

View File

@ -510,7 +510,10 @@ int32_t shellReadCommand(char *command) {
shellClearLineAfter(&cmd);
break;
case 12: // Ctrl + L;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
system("clear");
#pragma GCC diagnostic pop
shellShowOnScreen(&cmd);
break;
case 21: // Ctrl + U;

View File

@ -62,7 +62,10 @@ int32_t shellRunSingleCommand(char *command) {
}
if (shellRegexMatch(command, "^[\t ]*clear[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
system("clear");
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-result"
system("clear");
#pragma GCC diagnostic pop
return 0;
}