Merge branch 'main' of github.com:taosdata/TDengine into main
This commit is contained in:
commit
1db4619fdc
|
@ -2,7 +2,7 @@
|
|||
# taos-tools
|
||||
ExternalProject_Add(taos-tools
|
||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||
GIT_TAG 41affde
|
||||
GIT_TAG ad1a32b
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -25,13 +25,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
// TODO remove it
|
||||
enum {
|
||||
TMQ_CONF__RESET_OFFSET__NONE = -3,
|
||||
TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
|
||||
TMQ_CONF__RESET_OFFSET__LATEST = -1,
|
||||
};
|
||||
|
||||
// clang-format off
|
||||
#define IS_META_MSG(x) ( \
|
||||
x == TDMT_VND_CREATE_STB \
|
||||
|
|
|
@ -3186,6 +3186,7 @@ typedef struct {
|
|||
SArray* blockData;
|
||||
SArray* blockTbName;
|
||||
SArray* blockSchema;
|
||||
// the following attributes are extended from SMqDataRsp
|
||||
int32_t createTableNum;
|
||||
SArray* createTableLen;
|
||||
SArray* createTableReq;
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
########################################################
|
||||
# #
|
||||
# TDengine Configuration #
|
||||
# Configuration #
|
||||
# Any questions, please email support@taosdata.com #
|
||||
# #
|
||||
########################################################
|
||||
|
||||
######### 0. Client only configurations #############
|
||||
|
||||
# The interval for TDengine CLI to send heartbeat to mnode
|
||||
# The interval for CLI to send heartbeat to mnode
|
||||
# shellActivityTimer 3
|
||||
|
||||
|
||||
############### 1. Cluster End point ############################
|
||||
|
||||
# The end point of the first dnode in the cluster to be connected to when this dnode or a TDengine CLI `taos` is started
|
||||
# The end point of the first dnode in the cluster to be connected to when this dnode or a CLI `taos` is started
|
||||
# firstEp hostname:6030
|
||||
|
||||
# The end point of the second dnode to be connected to if the firstEp is not available
|
||||
|
@ -40,10 +40,10 @@
|
|||
# temporary file's directory, if you are using Windows platform please change to Windows path
|
||||
# tempDir /tmp/
|
||||
|
||||
# Switch for allowing TDengine to collect and report service usage information
|
||||
# Switch for allowing to collect and report service usage information
|
||||
# telemetryReporting 1
|
||||
|
||||
# Switch for allowing TDengine to collect and report crash information
|
||||
# Switch for allowing to collect and report crash information
|
||||
# crashReporting 1
|
||||
|
||||
# The maximum number of vnodes supported by this dnode
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
#!/usr/bin/expect
|
||||
set packgeName [lindex $argv 0]
|
||||
set packageName [lindex $argv 0]
|
||||
set packageSuffix [lindex $argv 1]
|
||||
set timeout 3
|
||||
if { ${packageSuffix} == "deb" } {
|
||||
spawn dpkg -i ${packgeName}
|
||||
spawn dpkg -i ${packageName}
|
||||
} elseif { ${packageSuffix} == "rpm"} {
|
||||
spawn rpm -ivh ${packgeName}
|
||||
spawn rpm -ivh ${packageName}
|
||||
}
|
||||
expect "*one:"
|
||||
send "\r"
|
||||
expect "*skip:"
|
||||
send "\r"
|
||||
|
||||
expect eof
|
||||
expect eof
|
||||
|
|
|
@ -25,7 +25,7 @@ sourcePath="nas"
|
|||
cpuType="x64"
|
||||
lite="true"
|
||||
packageType="tar"
|
||||
subFile="taos.tar.gz"
|
||||
subFile="package.tar.gz"
|
||||
while getopts "m:c:f:l:s:o:t:v:h" opt; do
|
||||
case $opt in
|
||||
m)
|
||||
|
@ -79,9 +79,9 @@ GREEN_UNDERLINE='\033[4;32m'
|
|||
NC='\033[0m'
|
||||
|
||||
if [[ ${verMode} = "enterprise" ]];then
|
||||
prePackag="TDengine-enterprise-${testFile}"
|
||||
prePackage="TDengine-enterprise-${testFile}"
|
||||
elif [ ${verMode} = "community" ];then
|
||||
prePackag="TDengine-${testFile}"
|
||||
prePackage="TDengine-${testFile}"
|
||||
fi
|
||||
if [ ${lite} = "true" ];then
|
||||
packageLite="-Lite"
|
||||
|
@ -92,10 +92,10 @@ if [[ "$packageType" = "tar" ]] ;then
|
|||
packageType="tar.gz"
|
||||
fi
|
||||
|
||||
tdPath="${prePackag}-${version}"
|
||||
originTdpPath="${prePackag}-${originversion}"
|
||||
tdPath="${prePackage}-${version}"
|
||||
originTdpPath="${prePackage}-${originversion}"
|
||||
|
||||
packgeName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||
packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||
originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||
|
||||
if [ "$testFile" == "server" ] ;then
|
||||
|
@ -105,13 +105,13 @@ elif [ ${testFile} = "client" ];then
|
|||
elif [ ${testFile} = "tools" ];then
|
||||
tdPath="taosTools-${version}"
|
||||
originTdpPath="taosTools-${originversion}"
|
||||
packgeName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||
packageName="${tdPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||
originPackageName="${originTdpPath}-Linux-${cpuType}${packageLite}.${packageType}"
|
||||
installCmd="install-taostools.sh"
|
||||
installCmd="install-tools.sh"
|
||||
fi
|
||||
|
||||
|
||||
echo "tdPath:${tdPath},originTdpPath:${originTdpPath},packgeName:${packgeName},originPackageName:${originPackageName}"
|
||||
echo "tdPath:${tdPath},originTdpPath:${originTdpPath},packageName:${packageName},originPackageName:${originPackageName}"
|
||||
function cmdInstall {
|
||||
command=$1
|
||||
if command -v ${command} ;then
|
||||
|
@ -206,7 +206,7 @@ else
|
|||
fi
|
||||
|
||||
|
||||
if [[ ${packgeName} =~ "server" ]] ;then
|
||||
if [[ ${packageName} =~ "server" ]] ;then
|
||||
echoColor BD " pkill -9 taosd "
|
||||
pkill -9 taosd
|
||||
fi
|
||||
|
@ -232,25 +232,25 @@ if [ -d ${installPath}/${tdPath} ] ;then
|
|||
fi
|
||||
|
||||
echoColor G "===== download installPackage ====="
|
||||
cd ${installPath} && wgetFile ${packgeName} ${version} ${sourcePath}
|
||||
cd ${installPath} && wgetFile ${packageName} ${version} ${sourcePath}
|
||||
cd ${oriInstallPath} && wgetFile ${originPackageName} ${originversion} ${sourcePath}
|
||||
|
||||
|
||||
cd ${installPath}
|
||||
cp -r ${scriptDir}/debRpmAutoInstall.sh .
|
||||
|
||||
packageSuffix=$(echo ${packgeName} | awk -F '.' '{print $NF}')
|
||||
packageSuffix=$(echo ${packageName} | awk -F '.' '{print $NF}')
|
||||
|
||||
|
||||
if [ ! -f debRpmAutoInstall.sh ];then
|
||||
echo '#!/usr/bin/expect ' > debRpmAutoInstall.sh
|
||||
echo 'set packgeName [lindex $argv 0]' >> debRpmAutoInstall.sh
|
||||
echo 'set packageName [lindex $argv 0]' >> debRpmAutoInstall.sh
|
||||
echo 'set packageSuffix [lindex $argv 1]' >> debRpmAutoInstall.sh
|
||||
echo 'set timeout 3 ' >> debRpmAutoInstall.sh
|
||||
echo 'if { ${packageSuffix} == "deb" } {' >> debRpmAutoInstall.sh
|
||||
echo ' spawn dpkg -i ${packgeName} ' >> debRpmAutoInstall.sh
|
||||
echo ' spawn dpkg -i ${packageName} ' >> debRpmAutoInstall.sh
|
||||
echo '} elseif { ${packageSuffix} == "rpm"} {' >> debRpmAutoInstall.sh
|
||||
echo ' spawn rpm -ivh ${packgeName}' >> debRpmAutoInstall.sh
|
||||
echo ' spawn rpm -ivh ${packageName}' >> debRpmAutoInstall.sh
|
||||
echo '}' >> debRpmAutoInstall.sh
|
||||
echo 'expect "*one:"' >> debRpmAutoInstall.sh
|
||||
echo 'send "\r"' >> debRpmAutoInstall.sh
|
||||
|
@ -261,25 +261,25 @@ fi
|
|||
|
||||
echoColor G "===== instal Package ====="
|
||||
|
||||
if [[ ${packgeName} =~ "deb" ]];then
|
||||
if [[ ${packageName} =~ "deb" ]];then
|
||||
cd ${installPath}
|
||||
dpkg -r taostools
|
||||
dpkg -r tdengine
|
||||
if [[ ${packgeName} =~ "TDengine" ]];then
|
||||
echoColor BD "./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}
|
||||
if [[ ${packageName} =~ "TDengine" ]];then
|
||||
echoColor BD "./debRpmAutoInstall.sh ${packageName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packageName} ${packageSuffix}
|
||||
else
|
||||
echoColor BD "dpkg -i ${packgeName}" && dpkg -i ${packgeName}
|
||||
echoColor BD "dpkg -i ${packageName}" && dpkg -i ${packageName}
|
||||
fi
|
||||
elif [[ ${packgeName} =~ "rpm" ]];then
|
||||
elif [[ ${packageName} =~ "rpm" ]];then
|
||||
cd ${installPath}
|
||||
sudo rpm -e tdengine
|
||||
sudo rpm -e taostools
|
||||
if [[ ${packgeName} =~ "TDengine" ]];then
|
||||
echoColor BD "./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packgeName} ${packageSuffix}
|
||||
if [[ ${packageName} =~ "TDengine" ]];then
|
||||
echoColor BD "./debRpmAutoInstall.sh ${packageName} ${packageSuffix}" && chmod 755 debRpmAutoInstall.sh && ./debRpmAutoInstall.sh ${packageName} ${packageSuffix}
|
||||
else
|
||||
echoColor BD "rpm -ivh ${packgeName}" && rpm -ivh ${packgeName}
|
||||
echoColor BD "rpm -ivh ${packageName}" && rpm -ivh ${packageName}
|
||||
fi
|
||||
elif [[ ${packgeName} =~ "tar" ]];then
|
||||
elif [[ ${packageName} =~ "tar" ]];then
|
||||
echoColor G "===== check installPackage File of tar ====="
|
||||
cd ${oriInstallPath}
|
||||
if [ ! -f {originPackageName} ];then
|
||||
|
@ -290,7 +290,7 @@ elif [[ ${packgeName} =~ "tar" ]];then
|
|||
echoColor BD "tar -xf ${originPackageName}" && tar -xf ${originPackageName}
|
||||
cd ${installPath}
|
||||
echoColor YD "unzip the new installation package"
|
||||
echoColor BD "tar -xf ${packgeName}" && tar -xf ${packgeName}
|
||||
echoColor BD "tar -xf ${packageName}" && tar -xf ${packageName}
|
||||
|
||||
if [ ${testFile} != "tools" ] ;then
|
||||
cd ${installPath}/${tdPath} && tar xf ${subFile}
|
||||
|
@ -326,15 +326,15 @@ fi
|
|||
|
||||
cd ${installPath}
|
||||
|
||||
if [[ ${packgeName} =~ "Lite" ]] || ([[ ${packgeName} =~ "x64" ]] && [[ ${packgeName} =~ "client" ]]) || ([[ ${packgeName} =~ "deb" ]] && [[ ${packgeName} =~ "server" ]]) || ([[ ${packgeName} =~ "rpm" ]] && [[ ${packgeName} =~ "server" ]]) ;then
|
||||
if [[ ${packageName} =~ "Lite" ]] || ([[ ${packageName} =~ "x64" ]] && [[ ${packageName} =~ "client" ]]) || ([[ ${packageName} =~ "deb" ]] && [[ ${packageName} =~ "server" ]]) || ([[ ${packageName} =~ "rpm" ]] && [[ ${packageName} =~ "server" ]]) ;then
|
||||
echoColor G "===== install taos-tools when package is lite or client ====="
|
||||
cd ${installPath}
|
||||
if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then
|
||||
wgetFile taosTools-2.1.3-Linux-x64.tar.gz v2.1.3 web
|
||||
tar xf taosTools-2.1.3-Linux-x64.tar.gz
|
||||
fi
|
||||
cd taosTools-2.1.3 && bash install-taostools.sh
|
||||
elif ([[ ${packgeName} =~ "arm64" ]] && [[ ${packgeName} =~ "client" ]]);then
|
||||
cd taosTools-2.1.3 && bash install-tools.sh
|
||||
elif ([[ ${packageName} =~ "arm64" ]] && [[ ${packageName} =~ "client" ]]);then
|
||||
echoColor G "===== install taos-tools arm when package is arm64-client ====="
|
||||
cd ${installPath}
|
||||
if [ ! -f "taosTools-2.1.3-Linux-x64.tar.gz " ];then
|
||||
|
@ -342,37 +342,37 @@ elif ([[ ${packgeName} =~ "arm64" ]] && [[ ${packgeName} =~ "client" ]]);then
|
|||
tar xf taosTools-2.1.3-Linux-arm64.tar.gz
|
||||
fi
|
||||
|
||||
cd taosTools-2.1.3 && bash install-taostools.sh
|
||||
cd taosTools-2.1.3 && bash install-tools.sh
|
||||
fi
|
||||
|
||||
echoColor G "===== start TDengine ====="
|
||||
|
||||
if [[ ${packgeName} =~ "server" ]] ;then
|
||||
if [[ ${packageName} =~ "server" ]] ;then
|
||||
echoColor BD " rm -rf /var/lib/taos/* && systemctl restart taosd "
|
||||
rm -rf /var/lib/taos/*
|
||||
systemctl restart taosd
|
||||
fi
|
||||
|
||||
rm -rf ${installPath}/${packgeName}
|
||||
rm -rf ${installPath}/${packageName}
|
||||
rm -rf ${installPath}/${tdPath}/
|
||||
|
||||
# if ([[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "tar" ]]) || [[ ${packgeName} =~ "client" ]] ;then
|
||||
# if ([[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "tar" ]]) || [[ ${packageName} =~ "client" ]] ;then
|
||||
# echoColor G "===== install taos-tools when package is lite or client ====="
|
||||
# cd ${installPath}
|
||||
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
||||
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
||||
# cd taosTools-2.1.2 && bash install-taostools.sh
|
||||
# elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "deb" ]] ;then
|
||||
# cd taosTools-2.1.2 && bash install-tools.sh
|
||||
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "deb" ]] ;then
|
||||
# echoColor G "===== install taos-tools when package is lite or client ====="
|
||||
# cd ${installPath}
|
||||
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
||||
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
||||
# cd taosTools-2.1.2 && bash install-taostools.sh
|
||||
# elif [[ ${packgeName} =~ "Lite" ]] && [[ ${packgeName} =~ "rpm" ]] ;then
|
||||
# cd taosTools-2.1.2 && bash install-tools.sh
|
||||
# elif [[ ${packageName} =~ "Lite" ]] && [[ ${packageName} =~ "rpm" ]] ;then
|
||||
# echoColor G "===== install taos-tools when package is lite or client ====="
|
||||
# cd ${installPath}
|
||||
# wgetFile taosTools-2.1.2-Linux-x64.tar.gz .
|
||||
# tar xf taosTools-2.1.2-Linux-x64.tar.gz
|
||||
# cd taosTools-2.1.2 && bash install-taostools.sh
|
||||
# cd taosTools-2.1.2 && bash install-tools.sh
|
||||
# fi
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ productName="TDengine"
|
|||
emailName="taosdata.com"
|
||||
uninstallScript="rmtaos"
|
||||
historyFile="taos_history"
|
||||
tarName="taos.tar.gz"
|
||||
tarName="package.tar.gz"
|
||||
dataDir="/var/lib/taos"
|
||||
logDir="/var/log/taos"
|
||||
configDir="/etc/taos"
|
||||
|
@ -222,24 +222,24 @@ function install_bin() {
|
|||
${csudo}cp -r ${script_dir}/bin/* ${install_main_dir}/bin && ${csudo}chmod 0555 ${install_main_dir}/bin/*
|
||||
|
||||
#Make link
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
|
||||
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
|
||||
[ -x ${install_main_dir}/bin/${udfdName} ] && ${csudo}ln -s ${install_main_dir}/bin/${udfdName} ${bin_link_dir}/${udfdName} || :
|
||||
[ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -s ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || :
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
|
||||
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
|
||||
[ -x ${install_main_dir}/bin/${udfdName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${udfdName} ${bin_link_dir}/${udfdName} || :
|
||||
[ -x ${install_main_dir}/bin/${adapterName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${adapterName} ${bin_link_dir}/${adapterName} || :
|
||||
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${demoName} || :
|
||||
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName} || :
|
||||
[ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || :
|
||||
[ -x ${install_main_dir}/bin/${xname} ] && ${csudo}ln -s ${install_main_dir}/bin/${xname} ${bin_link_dir}/${xname} || :
|
||||
[ -x ${install_main_dir}/bin/${explorerName} ] && ${csudo}ln -s ${install_main_dir}/bin/${explorerName} ${bin_link_dir}/${explorerName} || :
|
||||
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -s ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
|
||||
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
|
||||
[ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName} || :
|
||||
[ -x ${install_main_dir}/bin/${xname} ] && ${csudo}ln -sf ${install_main_dir}/bin/${xname} ${bin_link_dir}/${xname} || :
|
||||
[ -x ${install_main_dir}/bin/${explorerName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${explorerName} ${bin_link_dir}/${explorerName} || :
|
||||
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
|
||||
[ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
|
||||
|
||||
if [ "$verMode" == "cluster" ] && [ "$clientName" != "$clientName2" ]; then
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName2} || :
|
||||
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -s ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName2} || :
|
||||
[ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -s ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName2} || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript2} || :
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName2} || :
|
||||
[ -x ${install_main_dir}/bin/${benchmarkName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName} ${bin_link_dir}/${benchmarkName2} || :
|
||||
[ -x ${install_main_dir}/bin/${dumpName} ] && ${csudo}ln -sf ${install_main_dir}/bin/${dumpName} ${bin_link_dir}/${dumpName2} || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript2} || :
|
||||
fi
|
||||
}
|
||||
|
||||
|
@ -250,14 +250,14 @@ function install_lib() {
|
|||
#${csudo}rm -rf ${v15_java_app_dir} || :
|
||||
${csudo}cp -rf ${script_dir}/driver/* ${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/*
|
||||
|
||||
${csudo}ln -s ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
|
||||
${csudo}ln -s ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
|
||||
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
|
||||
|
||||
[ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || :
|
||||
|
||||
if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then
|
||||
${csudo}ln -s ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
|
||||
${csudo}ln -s ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 || :
|
||||
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || :
|
||||
|
||||
[ -f ${install_main_dir}/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || :
|
||||
fi
|
||||
|
@ -347,10 +347,10 @@ function install_header() {
|
|||
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || :
|
||||
|
||||
${csudo}cp -f ${script_dir}/inc/* ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/*
|
||||
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
|
||||
${csudo}ln -sf ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
|
||||
${csudo}ln -sf ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
|
||||
${csudo}ln -sf ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
|
||||
${csudo}ln -sf ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
|
||||
|
||||
[ -f ${install_main_dir}/include/taosws.h ] && ${csudo}ln -sf ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h || :
|
||||
}
|
||||
|
@ -511,7 +511,7 @@ function install_adapter_config() {
|
|||
fi
|
||||
|
||||
[ -f ${cfg_install_dir}/${adapterName}.toml ] &&
|
||||
${csudo}ln -s ${cfg_install_dir}/${adapterName}.toml ${install_main_dir}/cfg/${adapterName}.toml
|
||||
${csudo}ln -sf ${cfg_install_dir}/${adapterName}.toml ${install_main_dir}/cfg/${adapterName}.toml
|
||||
|
||||
[ ! -z $1 ] && return 0 || : # only install client
|
||||
|
||||
|
@ -527,7 +527,7 @@ function install_config() {
|
|||
${csudo}cp -f ${script_dir}/cfg/${configFile} ${cfg_install_dir}/${configFile}.new
|
||||
fi
|
||||
|
||||
${csudo}ln -s ${cfg_install_dir}/${configFile} ${install_main_dir}/cfg
|
||||
${csudo}ln -sf ${cfg_install_dir}/${configFile} ${install_main_dir}/cfg
|
||||
|
||||
[ ! -z $1 ] && return 0 || : # only install client
|
||||
|
||||
|
@ -573,13 +573,13 @@ function install_log() {
|
|||
${csudo}rm -rf ${log_dir} || :
|
||||
${csudo}mkdir -p ${log_dir} && ${csudo}chmod 777 ${log_dir}
|
||||
|
||||
${csudo}ln -s ${log_dir} ${install_main_dir}/log
|
||||
${csudo}ln -sf ${log_dir} ${install_main_dir}/log
|
||||
}
|
||||
|
||||
function install_data() {
|
||||
${csudo}mkdir -p ${data_dir}
|
||||
|
||||
${csudo}ln -s ${data_dir} ${install_main_dir}/data
|
||||
${csudo}ln -sf ${data_dir} ${install_main_dir}/data
|
||||
}
|
||||
|
||||
function install_connector() {
|
||||
|
@ -862,21 +862,21 @@ function updateProduct() {
|
|||
openresty_work=false
|
||||
|
||||
echo
|
||||
echo -e "${GREEN_DARK}To configure ${productName} ${NC}: edit ${cfg_install_dir}/${configFile}"
|
||||
echo -e "${GREEN_DARK}To configure ${productName2} ${NC}: edit ${cfg_install_dir}/${configFile}"
|
||||
[ -f ${configDir}/taosadapter.toml ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To configure Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
echo -e "${GREEN_DARK}To configure ${clientName2} Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
if ((${service_mod} == 0)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}systemctl start ${serverName}${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}systemctl start ${serverName}${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Adatper ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
elif ((${service_mod} == 1)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}service ${serverName} start${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}service ${serverName} start${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
else
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ./${serverName}${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ./${serverName}${NC}"
|
||||
[ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start ${clientName} Adapter ${NC}: taosadapter &${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: taosadapter &${NC}"
|
||||
fi
|
||||
|
||||
if [ ${openresty_work} = 'true' ]; then
|
||||
|
@ -887,7 +887,7 @@ function updateProduct() {
|
|||
|
||||
if ((${prompt_force} == 1)); then
|
||||
echo ""
|
||||
echo -e "${RED}Please run '${serverName} --force-keep-file' at first time for the exist ${productName} $exist_version!${NC}"
|
||||
echo -e "${RED}Please run '${serverName} --force-keep-file' at first time for the exist ${productName2} $exist_version!${NC}"
|
||||
fi
|
||||
echo
|
||||
echo -e "\033[44;32;1m${productName2} is updated successfully!${NC}"
|
||||
|
@ -944,21 +944,21 @@ function installProduct() {
|
|||
|
||||
# Ask if to start the service
|
||||
echo
|
||||
echo -e "${GREEN_DARK}To configure ${productName} ${NC}: edit ${cfg_install_dir}/${configFile}"
|
||||
echo -e "${GREEN_DARK}To configure ${productName2} ${NC}: edit ${cfg_install_dir}/${configFile}"
|
||||
[ -f ${configDir}/taosadapter.toml ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To configure Taos Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
echo -e "${GREEN_DARK}To configure ${clientName2} Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
if ((${service_mod} == 0)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}systemctl start ${serverName}${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}systemctl start ${serverName}${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adatper ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
elif ((${service_mod} == 1)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}service ${serverName} start${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${csudo}service ${serverName} start${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
else
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${serverName}${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${productName2} ${NC}: ${serverName}${NC}"
|
||||
[ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: taosadapter &${NC}"
|
||||
echo -e "${GREEN_DARK}To start ${clientName2} Adapter ${NC}: taosadapter &${NC}"
|
||||
fi
|
||||
|
||||
if [ ! -z "$firstEp" ]; then
|
||||
|
@ -970,24 +970,24 @@ function installProduct() {
|
|||
tmpPort=""
|
||||
fi
|
||||
if [[ "$tmpPort" != "" ]]; then
|
||||
echo -e "${GREEN_DARK}To access ${productName} ${NC}: ${clientName} -h $tmpFqdn -P $tmpPort${GREEN_DARK} to login into cluster, then${NC}"
|
||||
echo -e "${GREEN_DARK}To access ${productName2} ${NC}: ${clientName2} -h $tmpFqdn -P $tmpPort${GREEN_DARK} to login into cluster, then${NC}"
|
||||
else
|
||||
echo -e "${GREEN_DARK}To access ${productName} ${NC}: ${clientName} -h $tmpFqdn${GREEN_DARK} to login into cluster, then${NC}"
|
||||
echo -e "${GREEN_DARK}To access ${productName2} ${NC}: ${clientName2} -h $tmpFqdn${GREEN_DARK} to login into cluster, then${NC}"
|
||||
fi
|
||||
echo -e "${GREEN_DARK}execute ${NC}: create dnode 'newDnodeFQDN:port'; ${GREEN_DARK}to add this new node${NC}"
|
||||
echo
|
||||
elif [ ! -z "$serverFqdn" ]; then
|
||||
echo -e "${GREEN_DARK}To access ${productName} ${NC}: ${clientName} -h $serverFqdn${GREEN_DARK} to login into ${productName} server${NC}"
|
||||
echo -e "${GREEN_DARK}To access ${productName2} ${NC}: ${clientName2} -h $serverFqdn${GREEN_DARK} to login into ${productName2} server${NC}"
|
||||
echo
|
||||
fi
|
||||
|
||||
echo -e "\033[44;32;1m${productName} is installed successfully!${NC}"
|
||||
echo -e "\033[44;32;1m${productName2} is installed successfully!${NC}"
|
||||
echo
|
||||
else # Only install client
|
||||
install_bin
|
||||
install_config
|
||||
echo
|
||||
echo -e "\033[44;32;1m${productName} client is installed successfully!${NC}"
|
||||
echo -e "\033[44;32;1m${productName2} client is installed successfully!${NC}"
|
||||
fi
|
||||
|
||||
touch ~/.${historyFile}
|
||||
|
|
|
@ -17,7 +17,7 @@ serverName="taosd"
|
|||
clientName="taos"
|
||||
uninstallScript="rmtaos"
|
||||
configFile="taos.cfg"
|
||||
tarName="taos.tar.gz"
|
||||
tarName="package.tar.gz"
|
||||
|
||||
osType=Linux
|
||||
pagMode=full
|
||||
|
|
|
@ -606,23 +606,23 @@ function update_TDengine() {
|
|||
|
||||
echo -e "${GREEN_DARK}To configure ${productName} ${NC}: edit ${configDir}/${configFile}"
|
||||
[ -f ${configDir}/taosadapter.toml ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To configure Taos Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
echo -e "${GREEN_DARK}To configure Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
if ((${service_mod} == 0)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}systemctl start ${serverName}${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adatper ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
elif ((${service_mod} == 1)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}service ${serverName} start${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
else
|
||||
if [ "$osType" != "Darwin" ]; then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${serverName}${NC}"
|
||||
[ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: taosadapter &${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: taosadapter &${NC}"
|
||||
else
|
||||
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}"
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
@ -658,23 +658,23 @@ function install_TDengine() {
|
|||
echo
|
||||
echo -e "${GREEN_DARK}To configure ${productName} ${NC}: edit ${configDir}/${configFile}"
|
||||
[ -f ${configDir}/taosadapter.toml ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To configure Taos Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
echo -e "${GREEN_DARK}To configure Adapter ${NC}: edit ${configDir}/taosadapter.toml"
|
||||
if ((${service_mod} == 0)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}systemctl start ${serverName}${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: ${csudo}systemctl start taosadapter ${NC}"
|
||||
elif ((${service_mod} == 1)); then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${csudo}service ${serverName} start${NC}"
|
||||
[ -f ${service_config_dir}/taosadapter.service ] && [ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: ${csudo}service taosadapter start${NC}"
|
||||
else
|
||||
if [ "$osType" != "Darwin" ]; then
|
||||
echo -e "${GREEN_DARK}To start ${productName} ${NC}: ${serverName}${NC}"
|
||||
[ -f ${installDir}/bin/taosadapter ] && \
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: taosadapter &${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: taosadapter &${NC}"
|
||||
else
|
||||
echo -e "${GREEN_DARK}To start service ${NC}: launchctl start com.tdengine.taosd${NC}"
|
||||
echo -e "${GREEN_DARK}To start Taos Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}"
|
||||
echo -e "${GREEN_DARK}To start Adapter ${NC}: launchctl start com.tdengine.taosadapter${NC}"
|
||||
fi
|
||||
fi
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ clientName2="${12}"
|
|||
productName="TDengine"
|
||||
clientName="taos"
|
||||
configFile="taos.cfg"
|
||||
tarName="taos.tar.gz"
|
||||
tarName="package.tar.gz"
|
||||
|
||||
if [ "$osType" != "Darwin" ]; then
|
||||
script_dir="$(dirname $(readlink -f $0))"
|
||||
|
|
|
@ -28,7 +28,7 @@ productName="TDengine"
|
|||
serverName="taosd"
|
||||
clientName="taos"
|
||||
configFile="taos.cfg"
|
||||
tarName="taos.tar.gz"
|
||||
tarName="package.tar.gz"
|
||||
dumpName="taosdump"
|
||||
benchmarkName="taosBenchmark"
|
||||
toolsName="taostools"
|
||||
|
@ -171,22 +171,22 @@ if [ -n "${taostools_bin_files}" ]; then
|
|||
&& cp ${taostools_bin_files} ${taostools_install_dir}/bin \
|
||||
&& chmod a+x ${taostools_install_dir}/bin/* || :
|
||||
|
||||
if [ -f ${top_dir}/tools/taos-tools/packaging/tools/install-taostools.sh ]; then
|
||||
cp ${top_dir}/tools/taos-tools/packaging/tools/install-taostools.sh \
|
||||
if [ -f ${top_dir}/tools/taos-tools/packaging/tools/install-tools.sh ]; then
|
||||
cp ${top_dir}/tools/taos-tools/packaging/tools/install-tools.sh \
|
||||
${taostools_install_dir}/ > /dev/null \
|
||||
&& chmod a+x ${taostools_install_dir}/install-taostools.sh \
|
||||
|| echo -e "failed to copy install-taostools.sh"
|
||||
&& chmod a+x ${taostools_install_dir}/install-tools.sh \
|
||||
|| echo -e "failed to copy install-tools.sh"
|
||||
else
|
||||
echo -e "install-taostools.sh not found"
|
||||
echo -e "install-tools.sh not found"
|
||||
fi
|
||||
|
||||
if [ -f ${top_dir}/tools/taos-tools/packaging/tools/uninstall-taostools.sh ]; then
|
||||
cp ${top_dir}/tools/taos-tools/packaging/tools/uninstall-taostools.sh \
|
||||
if [ -f ${top_dir}/tools/taos-tools/packaging/tools/uninstall-tools.sh ]; then
|
||||
cp ${top_dir}/tools/taos-tools/packaging/tools/uninstall-tools.sh \
|
||||
${taostools_install_dir}/ > /dev/null \
|
||||
&& chmod a+x ${taostools_install_dir}/uninstall-taostools.sh \
|
||||
|| echo -e "failed to copy uninstall-taostools.sh"
|
||||
&& chmod a+x ${taostools_install_dir}/uninstall-tools.sh \
|
||||
|| echo -e "failed to copy uninstall-tools.sh"
|
||||
else
|
||||
echo -e "uninstall-taostools.sh not found"
|
||||
echo -e "uninstall-tools.sh not found"
|
||||
fi
|
||||
|
||||
if [ -f ${build_dir}/lib/libavro.so.23.0.0 ]; then
|
||||
|
|
|
@ -530,7 +530,7 @@ function install_service_on_sysvinit() {
|
|||
function clean_service_on_systemd() {
|
||||
taosd_service_config="${service_config_dir}/taosd.service"
|
||||
|
||||
# taosd service already is stoped before install in preinst script
|
||||
# taosd service already is stopped before install in preinst script
|
||||
#if systemctl is-active --quiet taosd; then
|
||||
# echo "TDengine is running, stopping it..."
|
||||
# ${csudo}systemctl stop taosd &> /dev/null || echo &> /dev/null
|
||||
|
|
|
@ -72,7 +72,7 @@ New Features:
|
|||
|
||||
taos-1.4.13 (Released on 2018-12-14)
|
||||
Bugs Fixed:
|
||||
- Clients failed to connect to server due to unexpected and invalid packets recieved by the server.
|
||||
- Clients failed to connect to server due to unexpected and invalid packets received by the server.
|
||||
Features Added:
|
||||
- Add support to HikariCP in TSDB JDBC driver.
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ read -p "Please enter link directory such as /var/lib/taos/tsdb: " linkDir
|
|||
|
||||
while true; do
|
||||
if [ ! -d $linkDir ]; then
|
||||
read -p "Paht not exists, please enter the correct link path:" linkDir
|
||||
read -p "Path not exists, please enter the correct link path:" linkDir
|
||||
continue
|
||||
fi
|
||||
break
|
||||
|
@ -28,12 +28,12 @@ for linkFile in $(find -L $linkDir -xtype l); do
|
|||
if [ -z "${dirHash["$dirName"]}" ]; then
|
||||
read -p "Please enter the directory to replace ${dirName}:" newDir
|
||||
|
||||
read -p "Do you want to replcace all[y/N]?" replcaceAll
|
||||
if [[ ( "${replcaceAll}" == "y") || ( "${replcaceAll}" == "Y") ]]; then
|
||||
read -p "Do you want to replace all[y/N]?" replaceAll
|
||||
if [[ ( "${replaceAll}" == "y") || ( "${replaceAll}" == "Y") ]]; then
|
||||
dirHash["$dirName"]="$newDir"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Replcace the file
|
||||
# Replace the file
|
||||
ln -sf "${newDir}/${baseName}" "${linkFile}"
|
||||
done
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
[Setup]
|
||||
VersionInfoVersion={#MyAppVersion}
|
||||
AppId={{A0F7A93C-79C4-485D-B2B8-F0D03DF42FAB}
|
||||
AppName={#MyAppName}
|
||||
AppName={#CusName}
|
||||
AppVersion={#MyAppVersion}
|
||||
;AppVerName={#MyAppName} {#MyAppVersion}
|
||||
AppPublisher={#MyAppPublisher}
|
||||
|
@ -64,8 +64,8 @@ Source: {#MyAppSourceDir}\taosdump.exe; DestDir: "{app}"; DestName: "{#CusPrompt
|
|||
|
||||
|
||||
[run]
|
||||
Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\{#CusName}\\taosd.exe --win_service""" ; Flags: runhidden
|
||||
Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\{#CusName}\\taosadapter.exe""" ; Flags: runhidden
|
||||
Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\TDengine\\taosd.exe --win_service""" ; Flags: runhidden
|
||||
Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe""" ; Flags: runhidden
|
||||
|
||||
[UninstallRun]
|
||||
RunOnceId: "stoptaosd"; Filename: {sys}\sc.exe; Parameters: "stop taosd" ; Flags: runhidden
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
TDengine is a high-efficient, scalable, high-available distributed time-series database, which makes a lot of optimizations on inserting and querying data, which is far more efficient than normal regular databases. So TDengine can meet the high requirements of IOT and other areas on storing and querying a large amount of data.
|
||||
TDengine is an open-source, cloud-native time-series database optimized for Internet of Things (IoT), Connected Cars, and Industrial IoT. With its built-in caching, stream processing, and data subscription capabilities, TDengine offers a simplified solution for time-series data processing.
|
||||
|
||||
TDengine will be installed under C:\TDengine, users can modify configuration file C:\TDengine\cfg\taos.cfg, set the log file path or other parameters.
|
||||
To start/stop TDengine with administrator privileges: sc start/stop taosd
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -898,7 +898,7 @@ TEST(clientCase, update_test) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST(clientCase, subscription_test) {
|
||||
TEST(clientCase, sub_db_test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
|
@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
|
|||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
tmq_conf_set(conf, "group.id", "cgrpNamedb");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
|
@ -925,7 +925,7 @@ TEST(clientCase, subscription_test) {
|
|||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
// tmq_list_append(topicList, "topic_t1");
|
||||
tmq_list_append(topicList, "topic_db1");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
|
@ -954,7 +954,86 @@ TEST(clientCase, subscription_test) {
|
|||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
if (count ++ > 20) {
|
||||
if (count ++ > 200) {
|
||||
tmq_unsubscribe(tmq);
|
||||
break;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(pRes);
|
||||
if (row == NULL) break;
|
||||
|
||||
fields = taos_fetch_fields(pRes);
|
||||
numOfFields = taos_field_count(pRes);
|
||||
precision = taos_result_precision(pRes);
|
||||
rows++;
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("precision: %d, row content: %s\n", precision, buf);
|
||||
}
|
||||
}
|
||||
// return rows;
|
||||
}
|
||||
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
|
||||
TEST(clientCase, sub_tb_test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create topic, code:%s", taos_errstr(pRes));
|
||||
// taos_free_result(pRes);
|
||||
// return;
|
||||
// }
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "topic_t1");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
TAOS_FIELD* fields = NULL;
|
||||
int32_t numOfFields = 0;
|
||||
int32_t precision = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 5000;
|
||||
|
||||
int32_t count = 0;
|
||||
|
||||
while (1) {
|
||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||
if (pRes) {
|
||||
char buf[1024];
|
||||
int32_t rows = 0;
|
||||
|
||||
const char* topicName = tmq_get_topic_name(pRes);
|
||||
const char* dbName = tmq_get_db_name(pRes);
|
||||
int32_t vgroupId = tmq_get_vgroup_id(pRes);
|
||||
|
||||
printf("topic: %s\n", topicName);
|
||||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
if (count ++ > 200) {
|
||||
tmq_unsubscribe(tmq);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -6639,8 +6639,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
|||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -6823,8 +6824,7 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
|||
}
|
||||
|
||||
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
||||
taosArrayDestroy(pRsp->blockDataLen);
|
||||
pRsp->blockDataLen = NULL;
|
||||
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);;
|
||||
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
||||
pRsp->blockData = NULL;
|
||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||
|
|
|
@ -24,10 +24,10 @@ extern "C" {
|
|||
|
||||
enum {
|
||||
MQ_CONSUMER_STATUS__MODIFY = 1,
|
||||
MQ_CONSUMER_STATUS__MODIFY_IN_REB,
|
||||
MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
|
||||
MQ_CONSUMER_STATUS__READY,
|
||||
MQ_CONSUMER_STATUS__LOST,
|
||||
MQ_CONSUMER_STATUS__LOST_IN_REB,
|
||||
MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
|
||||
MQ_CONSUMER_STATUS__LOST_REBD,
|
||||
MQ_CONSUMER_STATUS__REMOVED,
|
||||
};
|
||||
|
|
|
@ -317,7 +317,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
|
||||
} else if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
|
|
|
@ -224,7 +224,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, sub, pVgEp->vgId, consumerId);
|
||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, sub, pVgEp->vgId, consumerId);
|
||||
}
|
||||
taosArrayDestroy(pConsumerEp->vgs);
|
||||
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
|
@ -329,7 +329,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, sub, consumerId);
|
||||
mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, sub, consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,7 +357,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
@ -387,12 +387,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
||||
mInfo("mq rebalance: skip vg %d for same consumer:%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
|
||||
mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
continue;
|
||||
}
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
} else {
|
||||
|
@ -1019,7 +1019,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
||||
|
||||
mDebug("mnd show subscriptions: topic %s, consumer:%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
||||
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
||||
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
||||
|
||||
// offset
|
||||
|
|
|
@ -79,43 +79,33 @@ typedef struct {
|
|||
} STqExecDb;
|
||||
|
||||
typedef struct {
|
||||
int8_t subType;
|
||||
|
||||
STqReader* pExecReader;
|
||||
qTaskInfo_t task;
|
||||
int8_t subType;
|
||||
STqReader* pExecReader;
|
||||
qTaskInfo_t task;
|
||||
union {
|
||||
STqExecCol execCol;
|
||||
STqExecTb execTb;
|
||||
STqExecDb execDb;
|
||||
};
|
||||
int32_t numOfCols; // number of out pout column, temporarily used
|
||||
int32_t numOfCols; // number of out pout column, temporarily used
|
||||
} STqExecHandle;
|
||||
|
||||
typedef struct {
|
||||
// info
|
||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
int8_t fetchMeta;
|
||||
|
||||
int64_t snapshotVer;
|
||||
|
||||
SWalReader* pWalReader;
|
||||
|
||||
SWalRef* pRef;
|
||||
|
||||
// push
|
||||
STqPushHandle pushHandle;
|
||||
|
||||
// exec
|
||||
STqExecHandle execHandle;
|
||||
|
||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
int8_t fetchMeta;
|
||||
int64_t snapshotVer;
|
||||
SWalReader* pWalReader;
|
||||
SWalRef* pRef;
|
||||
STqPushHandle pushHandle; // push
|
||||
STqExecHandle execHandle; // exec
|
||||
} STqHandle;
|
||||
|
||||
typedef struct {
|
||||
SMqDataRsp dataRsp;
|
||||
SMqDataRsp* pDataRsp;
|
||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
SRpcHandleInfo pInfo;
|
||||
SRpcHandleInfo info;
|
||||
} STqPushEntry;
|
||||
|
||||
struct STQ {
|
||||
|
@ -151,13 +141,13 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
|||
// tqRead
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||
|
||||
// tqExec
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
||||
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
|
||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
|
||||
|
||||
// tqMeta
|
||||
|
|
|
@ -192,6 +192,9 @@ void tqCleanUp();
|
|||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||
void tqClose(STQ*);
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
|
||||
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||
|
|
|
@ -68,7 +68,13 @@ static void destroySTqHandle(void* data) {
|
|||
|
||||
static void tqPushEntryFree(void* data) {
|
||||
STqPushEntry* p = *(void**)data;
|
||||
tDeleteSMqDataRsp(&p->dataRsp);
|
||||
if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
tDeleteSMqDataRsp(p->pDataRsp);
|
||||
} else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
|
||||
}
|
||||
|
||||
taosMemoryFree(p->pDataRsp);
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
|
@ -166,12 +172,16 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
||||
|
||||
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
||||
int64_t consumerId, int32_t type) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||
|
||||
if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
||||
}
|
||||
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
|
@ -183,23 +193,83 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
||||
((SMqRspHead*)buf)->mqMsgType = type;
|
||||
((SMqRspHead*)buf)->epoch = epoch;
|
||||
((SMqRspHead*)buf)->consumerId = consumerId;
|
||||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
||||
|
||||
if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
||||
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rsp = {
|
||||
.info = pPushEntry->pInfo,
|
||||
.info = *pRpcHandleInfo,
|
||||
.pCont = buf,
|
||||
.contLen = tlen,
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
tmsgSendRsp(&rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
A(!pRsp->withSchema);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
}
|
||||
#endif
|
||||
|
||||
// int32_t len = 0;
|
||||
// int32_t code = 0;
|
||||
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||
// if (code < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||
// void* buf = rpcMallocCont(tlen);
|
||||
// if (buf == NULL) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
||||
//
|
||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
//
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, abuf, len);
|
||||
// tEncodeSMqDataRsp(&encoder, pRsp);
|
||||
// tEncoderClear(&encoder);
|
||||
//
|
||||
// SRpcMsg rsp = {
|
||||
// .info = pPushEntry->pInfo,
|
||||
// .pCont = buf,
|
||||
// .contLen = tlen,
|
||||
// .code = 0,
|
||||
// };
|
||||
//
|
||||
// tmsgSendRsp(&rsp);
|
||||
//
|
||||
|
||||
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
|
||||
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
|
||||
|
||||
char buf1[80] = {0};
|
||||
char buf2[80] = {0};
|
||||
|
@ -207,93 +277,146 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
|||
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
|
||||
#if 0
|
||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
|
||||
A(!pRsp->withSchema);
|
||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
|
||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
if (pRsp->blockNum > 0) {
|
||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
} else {
|
||||
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.pCont = buf,
|
||||
.contLen = tlen,
|
||||
.code = 0,
|
||||
};
|
||||
tmsgSendRsp(&rsp);
|
||||
// int32_t len = 0;
|
||||
// int32_t code = 0;
|
||||
//
|
||||
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
||||
// }
|
||||
//
|
||||
// if (code < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||
// void* buf = rpcMallocCont(tlen);
|
||||
// if (buf == NULL) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// ((SMqRspHead*)buf)->mqMsgType = type;
|
||||
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||
//
|
||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
//
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, abuf, len);
|
||||
//
|
||||
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
// tEncodeSMqDataRsp(&encoder, pRsp);
|
||||
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
||||
// }
|
||||
//
|
||||
// tEncoderClear(&encoder);
|
||||
//
|
||||
// SRpcMsg rsp = {
|
||||
// .info = pMsg->info,
|
||||
// .pCont = buf,
|
||||
// .contLen = tlen,
|
||||
// .code = 0,
|
||||
// };
|
||||
//
|
||||
// tmsgSendRsp(&rsp);
|
||||
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
|
||||
|
||||
char buf1[80] = {0};
|
||||
char buf2[80] = {0};
|
||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s",
|
||||
|
||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s",
|
||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||
int32_t len = 0;
|
||||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
||||
void* buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSTaosxRsp(&encoder, pRsp);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.pCont = buf,
|
||||
.contLen = tlen,
|
||||
.code = 0,
|
||||
};
|
||||
tmsgSendRsp(&rsp);
|
||||
|
||||
char buf1[80] = {0};
|
||||
char buf2[80] = {0};
|
||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||
|
||||
return 0;
|
||||
}
|
||||
//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||
//#if 0
|
||||
// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||
// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||
//
|
||||
// if (pRsp->withSchema) {
|
||||
// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||
// } else {
|
||||
// A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||
// }
|
||||
//
|
||||
// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||
// if (pRsp->blockNum > 0) {
|
||||
// A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||
// } else {
|
||||
// A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||
// }
|
||||
// }
|
||||
//#endif
|
||||
//
|
||||
// int32_t len = 0;
|
||||
// int32_t code = 0;
|
||||
// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
||||
// if (code < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||
// void* buf = rpcMallocCont(tlen);
|
||||
// if (buf == NULL) {
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
||||
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||
//
|
||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||
//
|
||||
// SEncoder encoder = {0};
|
||||
// tEncoderInit(&encoder, abuf, len);
|
||||
// tEncodeSTaosxRsp(&encoder, pRsp);
|
||||
// tEncoderClear(&encoder);
|
||||
//
|
||||
// SRpcMsg rsp = {
|
||||
// .info = pMsg->info,
|
||||
// .pCont = buf,
|
||||
// .contLen = tlen,
|
||||
// .code = 0,
|
||||
// };
|
||||
//
|
||||
// tmsgSendRsp(&rsp);
|
||||
//
|
||||
// char buf1[80] = {0};
|
||||
// char buf2[80] = {0};
|
||||
// tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||
// tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||
//
|
||||
// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||
// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||
// return 0;
|
||||
//}
|
||||
|
||||
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||
|
@ -382,8 +505,18 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
|||
}
|
||||
|
||||
pRsp->withTbName = 0;
|
||||
pRsp->withSchema = false;
|
||||
#if 0
|
||||
pRsp->withTbName = pReq->withTbName;
|
||||
if (pRsp->withTbName) {
|
||||
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
|
||||
if (pRsp->blockTbName == NULL) {
|
||||
// TODO free
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
pRsp->withSchema = false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -404,170 +537,147 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SMqPollReq req = {0};
|
||||
int32_t code = 0;
|
||||
STqOffsetVal fetchOffsetNew;
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, bool* pBlockReturned) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
||||
*pBlockReturned = false;
|
||||
|
||||
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
|
||||
return -1;
|
||||
}
|
||||
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
||||
if (pOffset != NULL) {
|
||||
*pOffsetVal = pOffset->val;
|
||||
|
||||
int64_t consumerId = req.consumerId;
|
||||
int32_t reqEpoch = req.epoch;
|
||||
STqOffsetVal reqOffset = req.reqOffset;
|
||||
|
||||
// 1. find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode),
|
||||
req.subKey);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 2. check rebalance
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// update epoch if need
|
||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
while (savedEpoch < reqEpoch) {
|
||||
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
||||
}
|
||||
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &reqOffset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
|
||||
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||
|
||||
// 2.reset offset if needed
|
||||
if (reqOffset.type > 0) {
|
||||
fetchOffsetNew = reqOffset;
|
||||
char formatBuf[80];
|
||||
tFormatOffset(formatBuf, 80, pOffsetVal);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf);
|
||||
return 0;
|
||||
} else {
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
|
||||
if (pOffset != NULL) {
|
||||
fetchOffsetNew = pOffset->val;
|
||||
char formatBuf[80];
|
||||
tFormatOffset(formatBuf, 80, &fetchOffsetNew);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
|
||||
TD_VID(pTq->pVnode), formatBuf);
|
||||
} else {
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
if (req.useSnapshot) {
|
||||
if (pHandle->fetchMeta) {
|
||||
tqOffsetResetToMeta(&fetchOffsetNew, 0);
|
||||
} else {
|
||||
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
||||
}
|
||||
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
if (pRequest->useSnapshot) {
|
||||
if (pHandle->fetchMeta) {
|
||||
tqOffsetResetToMeta(pOffsetVal, 0);
|
||||
} else {
|
||||
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
if (pHandle->pRef == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
|
||||
tqOffsetResetToData(pOffsetVal, 0, 0);
|
||||
}
|
||||
} else {
|
||||
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
if (pHandle->pRef == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
} else {
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, &req);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
||||
" in vg %d, subkey %s, reset none failed",
|
||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
return -1;
|
||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
// lock
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
|
||||
*pBlockReturned = true;
|
||||
return code;
|
||||
} else {
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
||||
*pBlockReturned = true;
|
||||
return code;
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
||||
" in vg %d, subkey %s, reset none failed",
|
||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||
int32_t code = -1;
|
||||
STqOffsetVal offset = {0};
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
|
||||
// 1. reset the offset if needed
|
||||
if (reqOffset.type > 0) {
|
||||
offset = reqOffset;
|
||||
} else { // handle the reset offset cases, according to the consumer's choice.
|
||||
bool blockReturned = false;
|
||||
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// empty block returned, quit
|
||||
if (blockReturned) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// this is a normal subscription requirement
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||
|
||||
// lock
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
|
||||
|
||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
|
||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||
if (pPushEntry != NULL) {
|
||||
pPushEntry->pInfo = pMsg->info;
|
||||
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
dataRsp.withTbName = 0;
|
||||
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
|
||||
pPushEntry->dataRsp.head.consumerId = consumerId;
|
||||
pPushEntry->dataRsp.head.epoch = reqEpoch;
|
||||
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr",
|
||||
consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode));
|
||||
// unlock
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
return 0;
|
||||
}
|
||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
return code;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
|
||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||
// NOTE: this pHandle->consumerId may have been changed already.
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||
", ts:%" PRId64,
|
||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
||||
dataRsp.rspOffset.ts);
|
||||
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
// todo handle the case where re-balance occurs.
|
||||
// for taosx
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, &req);
|
||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||
|
||||
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) {
|
||||
if (offset.type != TMQ_OFFSET__LOG) {
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
||||
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
||||
",version:%" PRId64,
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||
metaRsp.rspOffset.version);
|
||||
taosMemoryFree(metaRsp.metaRsp);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -575,54 +685,56 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
if (taosxRsp.blockNum > 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
} else {
|
||||
fetchOffsetNew = taosxRsp.rspOffset;
|
||||
offset = taosxRsp.rspOffset;
|
||||
}
|
||||
|
||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||
",version:%" PRId64,
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
|
||||
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
|
||||
}
|
||||
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
||||
taosxRsp.rspOffset.version);
|
||||
} else {
|
||||
|
||||
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
||||
int64_t fetchVer = fetchOffsetNew.version + 1;
|
||||
// if (offset.type == TMQ_OFFSET__LOG) {
|
||||
int64_t fetchVer = offset.version + 1;
|
||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||
if (pCkHead == NULL) {
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||
|
||||
while (1) {
|
||||
savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
if (savedEpoch > reqEpoch) {
|
||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
||||
// todo refactor: this is not correct.
|
||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
if (savedEpoch > pRequest->epoch) {
|
||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
|
||||
", found new consumer epoch %d, discard req epoch %d",
|
||||
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch);
|
||||
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
|
||||
break;
|
||||
}
|
||||
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
// if (terrno == 0) { // failed to seek to given ver, but no errors happen.
|
||||
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
// return code;
|
||||
// } else { // error happens, return to consumers
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
// }
|
||||
}
|
||||
|
||||
SWalCont* pHead = &pCkHead->head;
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
||||
pRequest->epoch, vgId, fetchVer, pHead->msgType);
|
||||
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
SPackedData submit = {
|
||||
|
@ -630,16 +742,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
|
||||
.ver = pHead->version,
|
||||
};
|
||||
|
||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
||||
req.subKey);
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
|
||||
pRequest->subKey);
|
||||
return -1;
|
||||
}
|
||||
if (taosxRsp.blockNum > 0 /* threshold */) {
|
||||
|
||||
if (taosxRsp.blockNum > 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return code;
|
||||
|
@ -655,7 +767,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
metaRsp.resMsgType = pHead->msgType;
|
||||
metaRsp.metaRspLen = pHead->bodyLen;
|
||||
metaRsp.metaRsp = pHead->body;
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -674,6 +786,55 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SMqPollReq req = {0};
|
||||
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t consumerId = req.consumerId;
|
||||
int32_t reqEpoch = req.epoch;
|
||||
STqOffsetVal reqOffset = req.reqOffset;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
// 1. find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 2. check re-balance status
|
||||
taosRLockLatch(&pTq->pushLock);
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
taosRUnLockLatch(&pTq->pushLock);
|
||||
return -1;
|
||||
}
|
||||
taosRUnLockLatch(&pTq->pushLock);
|
||||
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
// 3. update the epoch value
|
||||
int32_t savedEpoch = pHandle->epoch;
|
||||
if (savedEpoch < reqEpoch) {
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||
pHandle->epoch = reqEpoch;
|
||||
}
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &reqOffset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||
|
||||
return extractDataForMq(pTq, pHandle, &req, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||
|
||||
|
@ -744,9 +905,9 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
|||
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SMqRebVgReq req = {0};
|
||||
tDecodeSMqRebVgReq(msg, &req);
|
||||
// todo lock
|
||||
|
||||
tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey);
|
||||
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
||||
req.oldConsumerId, req.newConsumerId);
|
||||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
|
@ -754,11 +915,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
|
||||
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
||||
}
|
||||
|
||||
if (req.newConsumerId == -1) {
|
||||
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||
taosMemoryFree(req.qmsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STqHandle tqHandle = {0};
|
||||
pHandle = &tqHandle;
|
||||
/*taosInitRWLatch(&pExec->lock);*/
|
||||
|
@ -774,8 +937,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
// TODO version should be assigned and refed during preprocess
|
||||
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
||||
if (pRef == NULL) {
|
||||
taosMemoryFree(req.qmsg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t ver = pRef->refVer;
|
||||
pHandle->pRef = pRef;
|
||||
|
||||
|
@ -786,6 +951,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
.initTqReader = true,
|
||||
.version = ver,
|
||||
};
|
||||
|
||||
pHandle->snapshotVer = ver;
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
|
@ -800,6 +966,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||
|
||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||
|
@ -808,7 +975,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
|
||||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
|
@ -828,28 +994,45 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
}
|
||||
|
||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId,
|
||||
oldConsumerId);
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||
pHandle->consumerId, oldConsumerId);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
taosMemoryFree(req.qmsg);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
// TODO handle qmsg and exec modification
|
||||
tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId);
|
||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
||||
atomic_store_32(&pHandle->epoch, -1);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
taosMemoryFree(req.qmsg);
|
||||
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||
req.newConsumerId);
|
||||
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
atomic_store_32(&pHandle->epoch, -1);
|
||||
|
||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||
tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true);
|
||||
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
taosMemoryFree(req.qmsg);
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
taosMemoryFree(req.qmsg);
|
||||
return -1;
|
||||
}
|
||||
// close handle
|
||||
}
|
||||
|
||||
taosMemoryFree(req.qmsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -924,15 +1107,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
pTask->tbSink.vnode = pTq->pVnode;
|
||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
||||
|
||||
int32_t version = 1;
|
||||
int32_t ver1 = 1;
|
||||
SMetaInfo info = {0};
|
||||
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
version = info.skmVer;
|
||||
ver1 = info.skmVer;
|
||||
}
|
||||
|
||||
pTask->tbSink.pTSchema =
|
||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version);
|
||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
|
||||
if(pTask->tbSink.pTSchema == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -46,11 +46,13 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp
|
|||
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||
|
||||
// TODO add reference to gurantee success
|
||||
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < n; i++) {
|
||||
char* tbName = taosStrdup(mr.me.name);
|
||||
taosArrayPush(pRsp->blockTbName, &tbName);
|
||||
|
@ -83,13 +85,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
|
||||
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
|
||||
|
||||
// current scan should be stopped asap, since the rebalance occurs.
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -99,7 +104,9 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
|||
|
||||
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
rowCnt += pDataBlock->info.rows;
|
||||
if (rowCnt >= 4096) break;
|
||||
if (rowCnt >= 4096) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -283,7 +283,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
tdbTbcMoveToFirst(pCur);
|
||||
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
STqHandle handle;
|
||||
STqHandle handle = {0};
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqHandle(&decoder, &handle);
|
||||
tDecoderClear(&decoder);
|
||||
|
|
|
@ -209,6 +209,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
|||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
||||
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
if (msgType == TDMT_VND_SUBMIT) {
|
||||
// lock push mgr to avoid potential msg lost
|
||||
|
@ -217,7 +218,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
||||
if (numOfRegisteredPush > 0) {
|
||||
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
||||
pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
||||
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
||||
|
||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||
|
@ -239,7 +240,10 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
||||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
||||
|
@ -248,17 +252,16 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
|
||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
|
||||
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
|
||||
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||
if (pRsp->reqOffset.version >= ver) {
|
||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
|
||||
pRsp->reqOffset.version, ver);
|
||||
continue;
|
||||
}
|
||||
|
||||
STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->task;
|
||||
|
||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
||||
|
||||
// prepare scan mem data
|
||||
SPackedData submit = {
|
||||
.msgStr = data,
|
||||
|
@ -274,7 +277,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||
tqDebug("vgId:%d, tq exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
||||
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
|
||||
}
|
||||
|
||||
if (pDataBlock == NULL) {
|
||||
|
@ -285,11 +288,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
pRsp->blockNum++;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
||||
pRsp->blockNum);
|
||||
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum);
|
||||
if (pRsp->blockNum > 0) {
|
||||
// set offset
|
||||
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
||||
|
||||
// remove from hash
|
||||
size_t kLen;
|
||||
void* key = taosHashGetKey(pIter, &kLen);
|
||||
|
@ -311,6 +314,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
||||
taosArrayDestroy(cachedKeyLens);
|
||||
taosMemoryFree(data);
|
||||
|
@ -336,9 +340,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
};
|
||||
|
||||
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
|
||||
|
||||
tqProcessSubmitReq(pTq, submit);
|
||||
}
|
||||
|
||||
if (msgType == TDMT_VND_DELETE) {
|
||||
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
||||
}
|
||||
|
@ -346,3 +350,61 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
||||
SMqDataRsp* pDataRsp, int32_t type) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
STqHandle* pTqHandle = pHandle;
|
||||
|
||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||
if (pPushEntry == NULL) {
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId,
|
||||
(int32_t)sizeof(STqPushEntry));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pPushEntry->info = pRpcMsg->info;
|
||||
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
|
||||
if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp));
|
||||
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp));
|
||||
} else if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp));
|
||||
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp));
|
||||
}
|
||||
|
||||
SMqRspHead* pHead = &pPushEntry->pDataRsp->head;
|
||||
pHead->consumerId = consumerId;
|
||||
pHead->epoch = pRequest->epoch;
|
||||
pHead->mqMsgType = type;
|
||||
|
||||
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", consumerId,
|
||||
pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
|
||||
|
||||
if (pEntry != NULL) {
|
||||
uint64_t cId = (*pEntry)->pDataRsp->head.consumerId;
|
||||
ASSERT(consumerId == cId);
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId,
|
||||
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
|
||||
|
||||
if (rspConsumer) { // rsp the old consumer with empty block.
|
||||
tqPushDataRsp(pTq, *pEntry);
|
||||
}
|
||||
|
||||
taosHashRemove(pTq->pPushMgr, pKey, keyLen);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -183,14 +183,15 @@ end:
|
|||
return tbSuid == realTbSuid;
|
||||
}
|
||||
|
||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||
int64_t offset = *fetchOffset;
|
||||
|
||||
while (1) {
|
||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||
*fetchOffset = offset - 1;
|
||||
code = -1;
|
||||
|
@ -241,6 +242,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
|||
offset++;
|
||||
}
|
||||
}
|
||||
|
||||
END:
|
||||
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
||||
return code;
|
||||
|
|
|
@ -307,7 +307,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
|
|||
bool existLeaderRole(TAOS_ROW row, TAOS_FIELD* fields, int nFields) {
|
||||
// vgroup_id | db_name | tables | v1_dnode | v1_status | v2_dnode | v2_status | v3_dnode | v3_status | v4_dnode |
|
||||
// v4_status | cacheload | tsma |
|
||||
if (nFields != 13) {
|
||||
if (nFields != 14) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -221,12 +221,12 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
|
|||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
||||
if (msg == NULL) {
|
||||
// create raw scan
|
||||
|
||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
if (NULL == pTaskInfo) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||
|
||||
pTaskInfo->cost.created = taosGetTimestampUs();
|
||||
|
@ -715,7 +715,6 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
|
|||
|
||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
|
||||
if (pTaskInfo == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
@ -723,7 +722,6 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
|||
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
||||
|
||||
setTaskKilled(pTaskInfo, rspCode);
|
||||
|
||||
qStopTaskOperators(pTaskInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -2774,11 +2774,17 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
|
|||
}
|
||||
|
||||
void qStreamCloseTsdbReader(void* task) {
|
||||
if (task == NULL) return;
|
||||
if (task == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
|
||||
SOperatorInfo* pOp = pTaskInfo->pRoot;
|
||||
qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
|
||||
|
||||
qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
|
||||
pTaskInfo->streamInfo.lastStatus.ts);
|
||||
|
||||
// todo refactor, other thread may already use this read to extract data.
|
||||
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
|
||||
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
|
||||
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
|
||||
|
@ -2786,8 +2792,19 @@ void qStreamCloseTsdbReader(void* task) {
|
|||
SStreamScanInfo* pInfo = pDownstreamOp->info;
|
||||
if (pInfo->pTableScanOp) {
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
|
||||
setOperatorCompleted(pInfo->pTableScanOp);
|
||||
while(pTaskInfo->owner != 0) {
|
||||
taosMsleep(100);
|
||||
qDebug("wait for the reader stopping");
|
||||
}
|
||||
|
||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
|
||||
// restore the status, todo refactor.
|
||||
pInfo->pTableScanOp->status = OP_OPENED;
|
||||
pTaskInfo->status = TASK_NOT_COMPLETED;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -751,7 +751,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
while (1) {
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result) {
|
||||
if (result || (pOperator->status == OP_EXEC_DONE)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -985,6 +985,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
|||
pTableScanInfo->scanTimes = 0;
|
||||
pTableScanInfo->currentGroupId = -1;
|
||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||
qDebug("1");
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
}
|
||||
|
||||
|
@ -1143,6 +1144,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
|||
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||
qDebug("2");
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1616,6 +1618,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
if (!pTaskInfo->streamInfo.returned) {
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||
qDebug("3");
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
|
||||
|
@ -1767,6 +1770,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
/*resetTableScanInfo(pTSInfo, pWin);*/
|
||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||
qDebug("4");
|
||||
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
pInfo->pTableScanOp->status = OP_OPENED;
|
||||
|
||||
|
@ -1837,6 +1842,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||
qDebug("5");
|
||||
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
|
||||
pTSInfo->base.cond.startVersion = -1;
|
||||
|
@ -2635,6 +2642,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
qDebug("8");
|
||||
|
||||
tsdbReaderClose(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
return NULL;
|
||||
|
|
|
@ -29,7 +29,7 @@ class TDTestCase:
|
|||
self.master_dnode = self.TDDnodes.dnodes[0]
|
||||
self.host=self.master_dnode.cfgDict["fqdn"]
|
||||
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
|
||||
tdSql.init(conn1.cursor())
|
||||
tdSql.init(conn1.cursor(), True)
|
||||
|
||||
|
||||
def getBuildPath(self):
|
||||
|
|
|
@ -123,8 +123,9 @@ class TDTestCase:
|
|||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
t = time.time()
|
||||
startTs = int(round(t * 1000))
|
||||
# t = 1678609778776 #time.time()
|
||||
t = 1600000000000
|
||||
startTs = t #int(round(t * 1000))
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
|
|
|
@ -386,7 +386,7 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
|||
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
||||
pInfo->numOfVgroups++;
|
||||
|
||||
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
|
||||
taosFprintfFile(g_fp, "consume id %d, add new vgroupId:%d\n", pInfo->consumerId, vgroupId);
|
||||
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
||||
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
|
||||
pInfo->numOfVgroups, vgroupId);
|
||||
|
@ -578,18 +578,25 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
|||
char buf[1024];
|
||||
int32_t totalRows = 0;
|
||||
|
||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
|
||||
taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
|
||||
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
||||
tmq_get_topic_name(msg), vgroupId);
|
||||
int32_t index = 0;
|
||||
for (index = 0; index < pInfo->numOfVgroups; index++) {
|
||||
if (vgroupId == pInfo->rowsOfPerVgroups[index][0]) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId:%d, currentRows:%d\n", dbName != NULL ? dbName : "invalid table",
|
||||
tmq_get_topic_name(msg), vgroupId, pInfo->rowsOfPerVgroups[index][1]);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
|
||||
if (row == NULL) break;
|
||||
if (row == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
|
@ -607,7 +614,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
|||
#endif
|
||||
|
||||
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
|
||||
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
if (0 != g_stConfInfo.showRowFlag) {
|
||||
|
@ -621,7 +627,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
|||
}
|
||||
|
||||
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
||||
|
||||
return totalRows;
|
||||
}
|
||||
|
||||
|
@ -730,9 +735,7 @@ void build_consumer(SThreadInfo* pInfo) {
|
|||
}
|
||||
|
||||
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -817,7 +820,6 @@ void loop_consume(SThreadInfo* pInfo) {
|
|||
}
|
||||
|
||||
taos_free_result(tmqMsg);
|
||||
|
||||
totalMsgs++;
|
||||
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
|
|
Loading…
Reference in New Issue