Merge branch 'main' into docs/readme
This commit is contained in:
commit
02abe7e36a
|
@ -0,0 +1,26 @@
|
|||
# reference
|
||||
# https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners
|
||||
|
||||
# merge team
|
||||
# @guanshengliang Shengliang Guan
|
||||
# @zitsen Linhe Huo
|
||||
# @wingwing2005 Ya Qiang Li
|
||||
# @feici02 WANG Xu
|
||||
# @hzcheng Hongze Cheng
|
||||
# @dapan1121 Pan Wei
|
||||
# @sheyanjie-qq She Yanjie
|
||||
# @pigzhou ZacharyZhou
|
||||
|
||||
* @taosdata/merge
|
||||
/.github/ @feici02
|
||||
/cmake/ @guanshengliang
|
||||
/contrib/ @guanshengliang
|
||||
/deps/ @guanshengliang
|
||||
/docs/ @guanshengliang @zitsen
|
||||
/examples/ @guanshengliang @zitsen
|
||||
/include/ @guanshengliang @hzcheng @dapan1121
|
||||
/packaging/ @feici02
|
||||
/source/ @guanshengliang @hzcheng @dapan1121
|
||||
/tests/ @guanshengliang @zitsen
|
||||
/tools/ @guanshengliang @zitsen
|
||||
/utils/ @guanshengliang
|
|
@ -77,12 +77,7 @@ After modifying configuration file parameters, you need to restart the *taosd* s
|
|||
|minReservedMemorySize | |Not supported |The minimum reserved system available memory size, all memory except reserved can be used for queries, unit: MB, default reserved size is 20% of system physical memory, value range 1024-1000000000|
|
||||
|singleQueryMaxMemorySize| |Not supported |The memory limit that a single query can use on a single node (dnode), exceeding this limit will return an error, unit: MB, default value: 0 (no limit), value range 0-1000000000|
|
||||
|filterScalarMode | |Not supported |Force scalar filter mode, 0: off; 1: on, default value 0|
|
||||
|queryPlannerTrace | |Supported, effective immediately |Internal parameter, whether the query plan outputs detailed logs|
|
||||
|queryNodeChunkSize | |Supported, effective immediately |Internal parameter, chunk size of the query plan|
|
||||
|queryUseNodeAllocator | |Supported, effective immediately |Internal parameter, allocation method of the query plan|
|
||||
|queryMaxConcurrentTables| |Not supported |Internal parameter, concurrency number of the query plan|
|
||||
|queryRsmaTolerance | |Not supported |Internal parameter, tolerance time for determining which level of rsma data to query, in milliseconds|
|
||||
|enableQueryHb | |Supported, effective immediately |Internal parameter, whether to send query heartbeat messages|
|
||||
|pqSortMemThreshold | |Not supported |Internal parameter, memory threshold for sorting|
|
||||
|
||||
### Region Related
|
||||
|
|
|
@ -2171,7 +2171,7 @@ ignore_negative: {
|
|||
|
||||
**Usage Instructions**:
|
||||
|
||||
- Can be used with the columns associated with the selection. For example: select _rowts, DERIVATIVE() from.
|
||||
- Can be used with the columns associated with the selection. For example: select _rowts, DERIVATIVE(col1, 1s, 1) from tb1.
|
||||
|
||||
### DIFF
|
||||
|
||||
|
|
|
@ -73,12 +73,7 @@ taosd 命令行参数如下
|
|||
|minReservedMemorySize | |不支持动态修改 |最小预留的系统可用内存数量,除预留外的内存都可以被用于查询,单位:MB,默认预留大小为系统物理内存的 20%,取值范围 1024 - 1000000000|
|
||||
|singleQueryMaxMemorySize| |不支持动态修改 |单个查询在单个节点(dnode)上可以使用的内存上限,超过该上限将返回错误,单位:MB,默认值:0(无上限),取值范围 0 - 1000000000|
|
||||
|filterScalarMode | |不支持动态修改 |强制使用标量过滤模式,0:关闭;1:开启,默认值 0|
|
||||
|queryPlannerTrace | |支持动态修改 立即生效 |内部参数,查询计划是否输出详细日志|
|
||||
|queryNodeChunkSize | |支持动态修改 立即生效 |内部参数,查询计划的块大小|
|
||||
|queryUseNodeAllocator | |支持动态修改 立即生效 |内部参数,查询计划的分配方法|
|
||||
|queryMaxConcurrentTables| |不支持动态修改 |内部参数,查询计划的并发数目|
|
||||
|queryRsmaTolerance | |不支持动态修改 |内部参数,用于判定查询哪一级 rsma 数据时的容忍时间,单位为毫秒|
|
||||
|enableQueryHb | |支持动态修改 立即生效 |内部参数,是否发送查询心跳消息|
|
||||
|pqSortMemThreshold | |不支持动态修改 |内部参数,排序使用的内存阈值|
|
||||
|
||||
### 区域相关
|
||||
|
|
|
@ -2099,7 +2099,7 @@ ignore_negative: {
|
|||
|
||||
**使用说明**:
|
||||
|
||||
- 可以与选择相关联的列一起使用。 例如: select \_rowts, DERIVATIVE() from。
|
||||
- 可以与选择相关联的列一起使用。 例如: select \_rowts, DERIVATIVE(col1, 1s, 1) from tb1。
|
||||
|
||||
### DIFF
|
||||
|
||||
|
|
|
@ -174,6 +174,7 @@ help() {
|
|||
echo " config_qemu_guest_agent - Configure QEMU guest agent"
|
||||
echo " deploy_docker - Deploy Docker"
|
||||
echo " deploy_docker_compose - Deploy Docker Compose"
|
||||
echo " install_trivy - Install Trivy"
|
||||
echo " clone_enterprise - Clone the enterprise repository"
|
||||
echo " clone_community - Clone the community repository"
|
||||
echo " clone_taosx - Clone TaosX repository"
|
||||
|
@ -316,6 +317,17 @@ add_config_if_not_exist() {
|
|||
grep -qF -- "$config" "$file" || echo "$config" >> "$file"
|
||||
}
|
||||
|
||||
# Function to check if a tool is installed
|
||||
check_installed() {
|
||||
local command_name="$1"
|
||||
if command -v "$command_name" >/dev/null 2>&1; then
|
||||
echo "$command_name is already installed. Skipping installation."
|
||||
return 0
|
||||
else
|
||||
echo "$command_name is not installed."
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
# General error handling function
|
||||
check_status() {
|
||||
local message_on_failure="$1"
|
||||
|
@ -584,9 +596,12 @@ centos_skip_check() {
|
|||
# Deploy cmake
|
||||
deploy_cmake() {
|
||||
# Check if cmake is installed
|
||||
if command -v cmake >/dev/null 2>&1; then
|
||||
echo "Cmake is already installed. Skipping installation."
|
||||
cmake --version
|
||||
# if command -v cmake >/dev/null 2>&1; then
|
||||
# echo "Cmake is already installed. Skipping installation."
|
||||
# cmake --version
|
||||
# return
|
||||
# fi
|
||||
if check_installed "cmake"; then
|
||||
return
|
||||
fi
|
||||
install_package "cmake3"
|
||||
|
@ -1058,11 +1073,13 @@ deploy_go() {
|
|||
GOPATH_DIR="/root/go"
|
||||
|
||||
# Check if Go is installed
|
||||
if command -v go >/dev/null 2>&1; then
|
||||
echo "Go is already installed. Skipping installation."
|
||||
# if command -v go >/dev/null 2>&1; then
|
||||
# echo "Go is already installed. Skipping installation."
|
||||
# return
|
||||
# fi
|
||||
if check_installed "go"; then
|
||||
return
|
||||
fi
|
||||
|
||||
# Fetch the latest version number of Go
|
||||
GO_LATEST_DATA=$(curl --retry 10 --retry-delay 5 --retry-max-time 120 -s https://golang.google.cn/VERSION?m=text)
|
||||
GO_LATEST_VERSION=$(echo "$GO_LATEST_DATA" | grep -oP 'go[0-9]+\.[0-9]+\.[0-9]+')
|
||||
|
@ -1731,6 +1748,42 @@ deploy_docker_compose() {
|
|||
fi
|
||||
}
|
||||
|
||||
# Instal trivy
|
||||
install_trivy() {
|
||||
echo -e "${YELLOW}Installing Trivy...${NO_COLOR}"
|
||||
# Check if Trivy is already installed
|
||||
# if command -v trivy >/dev/null 2>&1; then
|
||||
# echo "Trivy is already installed. Skipping installation."
|
||||
# trivy --version
|
||||
# return
|
||||
# fi
|
||||
if check_installed "trivy"; then
|
||||
return
|
||||
fi
|
||||
# Install jq
|
||||
install_package jq
|
||||
# Get latest version
|
||||
LATEST_VERSION=$(curl -s https://api.github.com/repos/aquasecurity/trivy/releases/latest | jq -r .tag_name)
|
||||
# Download
|
||||
if [ -f /etc/debian_version ]; then
|
||||
wget https://github.com/aquasecurity/trivy/releases/download/"${LATEST_VERSION}"/trivy_"${LATEST_VERSION#v}"_Linux-64bit.deb
|
||||
# Install
|
||||
dpkg -i trivy_"${LATEST_VERSION#v}"_Linux-64bit.deb
|
||||
|
||||
elif [ -f /etc/redhat-release ]; then
|
||||
wget https://github.com/aquasecurity/trivy/releases/download/"${LATEST_VERSION}"/trivy_"${LATEST_VERSION#v}"_Linux-64bit.rpm
|
||||
# Install
|
||||
rpm -ivh trivy_"${LATEST_VERSION#v}"_Linux-64bit.rpm
|
||||
else
|
||||
echo "Unsupported Linux distribution."
|
||||
exit 1
|
||||
fi
|
||||
# Check
|
||||
trivy --version
|
||||
check_status "Failed to install Trivy" "Trivy installed successfully." $?
|
||||
rm -rf trivy_"${LATEST_VERSION#v}"_Linux-64bit.deb trivy_"${LATEST_VERSION#v}"_Linux-64bit.rpm
|
||||
}
|
||||
|
||||
# Reconfigure cloud-init
|
||||
reconfig_cloud_init() {
|
||||
echo "Reconfiguring cloud-init..."
|
||||
|
@ -2004,6 +2057,7 @@ deploy_dev() {
|
|||
install_nginx
|
||||
deploy_docker
|
||||
deploy_docker_compose
|
||||
install_trivy
|
||||
check_status "Failed to deploy some tools" "Deploy all tools successfully" $?
|
||||
}
|
||||
|
||||
|
@ -2159,6 +2213,9 @@ main() {
|
|||
deploy_docker_compose)
|
||||
deploy_docker_compose
|
||||
;;
|
||||
install_trivy)
|
||||
install_trivy
|
||||
;;
|
||||
clone_enterprise)
|
||||
clone_enterprise
|
||||
;;
|
||||
|
|
|
@ -90,7 +90,7 @@ fi
|
|||
|
||||
kill_service_of() {
|
||||
_service=$1
|
||||
pid=$(ps -C $_service | grep -v $uninstallScript | awk '{print $2}')
|
||||
pid=$(ps -C $_service | grep -w $_service | grep -v $uninstallScript | awk '{print $1}')
|
||||
if [ -n "$pid" ]; then
|
||||
${csudo}kill -9 $pid || :
|
||||
fi
|
||||
|
@ -140,9 +140,8 @@ clean_service_of() {
|
|||
clean_service_on_systemd_of $_service
|
||||
elif ((${service_mod} == 1)); then
|
||||
clean_service_on_sysvinit_of $_service
|
||||
else
|
||||
kill_service_of $_service
|
||||
fi
|
||||
kill_service_of $_service
|
||||
}
|
||||
|
||||
remove_service_of() {
|
||||
|
|
|
@ -40,7 +40,7 @@ if command -v sudo > /dev/null; then
|
|||
fi
|
||||
|
||||
function kill_client() {
|
||||
pid=$(ps -C ${clientName2} | grep -v $uninstallScript2 | awk '{print $2}')
|
||||
pid=$(ps -C ${clientName2} | grep -w ${clientName2} | grep -v $uninstallScript2 | awk '{print $1}')
|
||||
if [ -n "$pid" ]; then
|
||||
${csudo}kill -9 $pid || :
|
||||
fi
|
||||
|
|
|
@ -197,7 +197,7 @@ void do_stmt(TAOS* taos, TAOS_STMT2_OPTION* option, const char* sql, int CTB_NUM
|
|||
checkError(stmt, code);
|
||||
|
||||
// exec
|
||||
int affected;
|
||||
int affected = 0;
|
||||
code = taos_stmt2_exec(stmt, &affected);
|
||||
total_affected += affected;
|
||||
checkError(stmt, code);
|
||||
|
@ -219,8 +219,9 @@ void do_stmt(TAOS* taos, TAOS_STMT2_OPTION* option, const char* sql, int CTB_NUM
|
|||
taosMemoryFree(tags);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS);
|
||||
if (option->asyncExecFn == NULL) {
|
||||
ASSERT_EQ(total_affected, CYC_NUMS * ROW_NUMS * CTB_NUMS);
|
||||
}
|
||||
for (int i = 0; i < CTB_NUMS; i++) {
|
||||
taosMemoryFree(tbs[i]);
|
||||
}
|
||||
|
|
|
@ -469,6 +469,13 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
|
|||
}
|
||||
|
||||
SStreamScanInfo* pScanInfo = pInfo->info;
|
||||
if (pInfo->pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { // clear meta cache for subscription if tag is changed
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
|
||||
int64_t* uid = (int64_t*)taosArrayGet(tableIdList, i);
|
||||
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
||||
taosLRUCacheErase(pTableScanInfo->base.metaCache.pTableMetaEntryCache, uid, LONG_BYTES);
|
||||
}
|
||||
}
|
||||
|
||||
if (isAdd) { // add new table id
|
||||
SArray* qa = NULL;
|
||||
|
|
|
@ -771,7 +771,34 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool funcNotSupportStringSma(SFunctionNode* pFunc) {
|
||||
SNode* pParam;
|
||||
switch (pFunc->funcType) {
|
||||
case FUNCTION_TYPE_MAX:
|
||||
case FUNCTION_TYPE_MIN:
|
||||
case FUNCTION_TYPE_SUM:
|
||||
case FUNCTION_TYPE_AVG:
|
||||
case FUNCTION_TYPE_AVG_PARTIAL:
|
||||
case FUNCTION_TYPE_PERCENTILE:
|
||||
case FUNCTION_TYPE_SPREAD:
|
||||
case FUNCTION_TYPE_SPREAD_PARTIAL:
|
||||
case FUNCTION_TYPE_SPREAD_MERGE:
|
||||
case FUNCTION_TYPE_TWA:
|
||||
pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (pParam && nodesIsExprNode(pParam) && (IS_VAR_DATA_TYPE(((SExprNode*)pParam)->resType.type))) {
|
||||
return true;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
if(funcNotSupportStringSma(pFunc)) {
|
||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
}
|
||||
return FUNC_DATA_REQUIRED_SMA_LOAD;
|
||||
}
|
||||
|
||||
|
|
|
@ -328,7 +328,9 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td33504.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts-5473.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5906.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-32187.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
|
||||
|
|
|
@ -75,6 +75,7 @@ class TDTestCase:
|
|||
tdLog.debug(" LIMIT test_case2 ............ [OK]")
|
||||
|
||||
self.test_TD_33336()
|
||||
self.ts5900()
|
||||
|
||||
# stop
|
||||
def stop(self):
|
||||
|
@ -137,6 +138,47 @@ class TDTestCase:
|
|||
|
||||
tdLog.debug("INSERT TABLE DATA ............ [OK]")
|
||||
return
|
||||
|
||||
def ts5900query(self):
|
||||
sql = "select max(c0) from ts5900.tt1"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '99.0')
|
||||
sql = "select min(c0) from ts5900.tt1"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '1.0')
|
||||
|
||||
def ts5900(self):
|
||||
tdSql.execute("drop database if exists ts5900;")
|
||||
tdSql.execute("create database ts5900;")
|
||||
|
||||
tdSql.execute("create table ts5900.meters (ts timestamp, c0 varchar(64)) tags(t0 varchar(64));")
|
||||
|
||||
sql = "CREATE TABLE ts5900.`tt1` USING ts5900.`meters` TAGS ('t11')"
|
||||
tdSql.execute(sql)
|
||||
for i in range(155):
|
||||
tdSql.query(f"insert into ts5900.tt1 values(now+{i*10}s, '{i+1}.0')")
|
||||
tdSql.query("insert into ts5900.tt1 values(now, '1.2')")
|
||||
tdSql.query("insert into ts5900.tt1 values(now+1s, '2.0')")
|
||||
tdSql.query("insert into ts5900.tt1 values(now+2s, '3.0')")
|
||||
tdSql.query("insert into ts5900.tt1 values(now+3s, '105.0')")
|
||||
tdSql.query("insert into ts5900.tt1 values(now+4s, '4.0')")
|
||||
|
||||
sql = "select count(*) from ts5900.tt1"
|
||||
tdSql.query(sql)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0, 0, '160')
|
||||
|
||||
for i in range(10):
|
||||
tdSql.execute("flush database ts5900")
|
||||
time.sleep(1)
|
||||
self.ts5900query()
|
||||
tdSql.query(f"insert into ts5900.tt1 values(now, '23.0')")
|
||||
self.ts5900query()
|
||||
tdLog.info(f"ts5900 test {i} ............ [OK]")
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
# test case1 base
|
||||
# def test_case1(self):
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
from taos.tmq import *
|
||||
from taos import *
|
||||
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def test(self):
|
||||
tdSql.execute(f'create database if not exists db')
|
||||
tdSql.execute(f'use db')
|
||||
tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)')
|
||||
tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
|
||||
tdSql.execute("INSERT INTO d1002 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
|
||||
tdSql.execute("INSERT INTO d1003 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
|
||||
tdSql.execute("INSERT INTO d1004 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
|
||||
|
||||
tdSql.execute(f'create topic t0 as select * from meters')
|
||||
tdSql.execute(f'create topic t1 as select * from meters')
|
||||
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["t0"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
try:
|
||||
res = consumer.poll(1)
|
||||
print(res)
|
||||
|
||||
consumer.unsubscribe()
|
||||
|
||||
try:
|
||||
consumer.subscribe(["t1"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
|
||||
res = consumer.poll(1)
|
||||
print(res)
|
||||
if res == None and taos_errno(None) != 0:
|
||||
tdLog.exit(f"poll error %d" % taos_errno(None))
|
||||
|
||||
except TmqError:
|
||||
tdLog.exit(f"poll error")
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
|
||||
def run(self):
|
||||
self.test()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -0,0 +1,90 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
from taos.tmq import *
|
||||
from taos import *
|
||||
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'debugFlag': 143, 'asynclog': 0}
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def test(self):
|
||||
tdSql.execute(f'create database if not exists db vgroups 1')
|
||||
tdSql.execute(f'use db')
|
||||
tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)')
|
||||
tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco1', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
|
||||
|
||||
|
||||
tdSql.execute(f'create topic t0 as select * from meters')
|
||||
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["t0"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
index = 0;
|
||||
try:
|
||||
while True:
|
||||
if index == 2:
|
||||
break
|
||||
res = consumer.poll(5)
|
||||
print(res)
|
||||
if not res:
|
||||
print("res null")
|
||||
break
|
||||
val = res.value()
|
||||
if val is None:
|
||||
continue
|
||||
for block in val:
|
||||
data = block.fetchall()
|
||||
for element in data:
|
||||
print(f"data len: {len(data)}")
|
||||
print(element)
|
||||
if index == 0 and data[0][-1] != 2:
|
||||
tdLog.exit(f"error: {data[0][-1]}")
|
||||
if index == 1 and data[0][-1] != 100:
|
||||
tdLog.exit(f"error: {data[0][-1]}")
|
||||
|
||||
tdSql.execute("alter table d1001 set tag groupId = 100")
|
||||
tdSql.execute("INSERT INTO d1001 VALUES('2018-10-05 14:38:06.000',10.30000,219,0.31000)")
|
||||
index += 1
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
|
||||
def run(self):
|
||||
self.test()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue